1package MogileFS::Store::Postgres; 2# vim: ts=4 sw=4 et ft=perl: 3use strict; 4use Digest::MD5 qw(md5); # Used for lockid 5use DBI; 6use DBD::Pg; 7use Sys::Hostname; 8use MogileFS::Util qw(throw debug error); 9use MogileFS::Server; 10use Carp; 11use base 'MogileFS::Store'; 12 13# -------------------------------------------------------------------------- 14# Package methods we override 15# -------------------------------------------------------------------------- 16 17sub dsn_of_dbhost { 18 my ($class, $dbname, $host, $port) = @_; 19 return "DBI:Pg:dbname=$dbname;host=$host" . ($port ? ";port=$port" : ""); 20} 21 22sub dsn_of_root { 23 my ($class, $dbname, $host, $port) = @_; 24 return $class->dsn_of_dbhost('postgres', $host, $port); 25} 26 27# -------------------------------------------------------------------------- 28# Store-related things we override 29# -------------------------------------------------------------------------- 30 31sub want_raise_errors { 1 } 32 33# given a root DBI connection, create the named database. succeed 34# if it it's made, or already exists. die otherwise. 35sub create_db_if_not_exists { 36 my ($pkg, $rdbh, $dbname) = @_; 37 if(not $rdbh->do("CREATE DATABASE $dbname TEMPLATE template0 ENCODING 'UTF-8'" )) { 38 die "Failed to create database '$dbname': " . $rdbh->errstr . "\n" if ($rdbh->errstr !~ /already exists/); 39 } 40} 41 42sub grant_privileges { 43 my ($pkg, $rdbh, $dbname, $user, $pass) = @_; 44 eval { 45 $rdbh->do("CREATE ROLE $user LOGIN PASSWORD ?", 46 undef, $pass); 47 }; 48 die "Failed to create user '$user': ". $rdbh->errstr . "\n" 49 if $rdbh->err && $rdbh->state != '42710'; 50 # Owning the database is postgres is important 51 $rdbh->do("ALTER DATABASE $dbname OWNER TO $user") 52 or die "Failed to grant privileges " . $rdbh->errstr . "\n"; 53} 54 55sub can_replace { 0 } 56sub can_insertignore { 0 } 57sub can_insert_multi { 0 } 58sub unix_timestamp { "EXTRACT(epoch FROM NOW())::int4" } 59 60sub init { 61 my $self = shift; 62 $self->SUPER::init; 63 my $database_version = $self->dbh->get_info(18); # SQL_DBMS_VER 64 # We need >=pg-8.2 because we use SAVEPOINT and ROLLBACK TO. 65 # We need >=pg-8.4 for working advisory locks 66 die "Postgres is too old! Must use >=postgresql-8.4!" if($database_version =~ /\A0[0-7]\.|08\.0[0123]/); 67 $self->{lock_depth} = 0; 68} 69 70sub post_dbi_connect { 71 my $self = shift; 72 $self->SUPER::post_dbi_connect; 73 $self->{lock_depth} = 0; 74} 75 76sub can_do_slaves { 0 } 77 78# TODO: Implement later 79#sub check_slave { 80#} 81 82sub was_deadlock_error { 83 my $self = shift; 84 my $dbh = $self->dbh; 85 return 0 unless $dbh->err; 86 return 1 if $dbh->state eq '40P01'; 87} 88 89sub was_duplicate_error { 90 my $self = shift; 91 my $dbh = $self->dbh; 92 return 0 unless $dbh->err; 93 return 1 if $dbh->state eq '23505' || $dbh->errstr =~ /duplicate/i; 94} 95 96sub table_exists { 97 my ($self, $table) = @_; 98 return eval { 99 my $sth = $self->dbh->table_info(undef, undef, $table, "table"); 100 my $rec = $sth->fetchrow_hashref; 101 return $rec ? 1 : 0; 102 }; 103} 104 105sub setup_database { 106 my $self = shift; 107 $self->add_extra_tables('lock'); 108 return $self->SUPER::setup_database; 109} 110 111sub filter_create_sql { 112 my ($self, $sql) = @_; 113 $sql =~ s/\bUNSIGNED\b//g; 114 $sql =~ s/\bVARBINARY\(\d+\)/bytea/g; 115 $sql =~ s/\b(?:TINY|MEDIUM)INT\b/SMALLINT/g; 116 $sql =~ s/\bINT\s+NOT\s+NULL\s+AUTO_INCREMENT\b/SERIAL/g; 117 $sql =~ s/# /-- /g; 118 119 my ($table) = $sql =~ /create\s+table\s+(\S+)/i; 120 die "didn't find table" unless $table; 121 my $index = sprintf 'INDEXES_%s', $table; 122 if ($self->can($index)) { 123 $sql =~ s!,\s*INDEX\s*(\w+)?\s*\(.+?\)!!mgi; 124 } 125 126 # Allow 64-bit ids for file IDs 127 $sql =~ s!\bfid\s+INT\b!fid BIGINT!i if $self->fid_type eq "BIGINT"; 128 129 return $sql; 130} 131 132sub TABLE_file { 133 "CREATE TABLE file ( 134 fid INT NOT NULL, 135 PRIMARY KEY (fid), 136 137 dmid SMALLINT NOT NULL, 138 dkey VARCHAR(255), -- domain-defined 139 UNIQUE (dmid, dkey), 140 141 length BIGINT, -- big limit 142 CHECK (length >= 0), 143 144 classid SMALLINT NOT NULL, 145 devcount SMALLINT NOT NULL 146 )" 147} 148 149sub INDEXES_file { 150 "CREATE INDEX file_devcount ON file (dmid,classid,devcount)" 151} 152 153sub INDEXES_unreachable_fids { 154 "CREATE INDEX unreachable_fids_lastupdate ON unreachable_fids (lastupdate)" 155} 156 157sub INDEXES_file_on { 158 "CREATE INDEX file_on_devid ON file_on (devid)" 159} 160 161sub TABLE_host { 162 "CREATE TABLE host ( 163 hostid SMALLINT NOT NULL, 164 PRIMARY KEY (hostid), 165 CHECK (hostid >= 0), 166 167 status VARCHAR(8), 168 CHECK (status IN ('alive','dead','down')), 169 170 http_port INT DEFAULT 7500, 171 CHECK (http_port >= 0), 172 CHECK (http_port < 65536), 173 174 http_get_port INT, 175 CHECK (http_get_port >= 0), 176 CHECK (http_get_port < 65536), 177 178 hostname VARCHAR(40), 179 UNIQUE (hostname), 180 hostip VARCHAR(15), 181 UNIQUE (hostip), 182 altip VARCHAR(15), 183 UNIQUE (altip), 184 altmask VARCHAR(18) 185 )" 186} 187 188sub TABLE_device { 189 "CREATE TABLE device ( 190 devid SMALLINT NOT NULL, 191 PRIMARY KEY (devid), 192 CHECK (devid >= 0), 193 194 hostid SMALLINT NOT NULL, 195 196 status VARCHAR(8), 197 CHECK (status IN ('alive','dead','down','readonly','drain')), 198 weight INT DEFAULT 100, 199 200 mb_total INT, 201 CHECK (mb_total >= 0), 202 mb_used INT, 203 CHECK (mb_used >= 0), 204 mb_asof INT 205 CHECK (mb_asof >= 0) 206 )" 207} 208 209sub INDEXES_device { 210 "CREATE INDEX device_status ON device (status)" 211} 212 213sub INDEXES_file_to_replicate { 214 "CREATE INDEX file_to_replicate_nexttry ON file_to_replicate (nexttry)" 215} 216 217sub INDEXES_file_to_delete2 { 218 "CREATE INDEX file_to_delete2_nexttry ON file_to_delete2 (nexttry)" 219} 220 221sub INDEXES_file_to_delete_later { 222 "CREATE INDEX file_to_delete_later_delafter ON file_to_delete_later (delafter)" 223} 224 225sub INDEXES_fsck_log { 226 "CREATE INDEX fsck_log_utime ON fsck_log (utime)" 227} 228 229sub INDEXES_file_to_queue { 230 "CREATE INDEX type_nexttry ON file_to_queue (type,nexttry)" 231} 232 233# Extra table 234sub TABLE_lock { 235 "CREATE TABLE lock ( 236 lockid INT NOT NULL, 237 PRIMARY KEY (lockid), 238 CHECK (lockid >= 0), 239 240 hostname VARCHAR(255) NOT NULL, 241 242 pid INT NOT NULL, 243 CHECK (pid >= 0), 244 245 acquiredat INT NOT NULL, 246 CHECK (acquiredat >= 0) 247 )" 248} 249 250sub upgrade_add_host_getport { 251 my $self = shift; 252 # see if they have the get port, else update it 253 unless ($self->column_type("host", "http_get_port")) { 254 $self->dowell("ALTER TABLE host ADD COLUMN http_get_port INT CHECK(http_get_port >= 0)"); 255 } 256 257} 258sub upgrade_add_host_altip { 259 my $self = shift; 260 unless ($self->column_type("host", "altip")) { 261 $self->dowell("ALTER TABLE host ADD COLUMN altip VARCHAR(15)"); 262 $self->dowell("ALTER TABLE host ADD COLUMN altmask VARCHAR(18)"); 263 $self->dowell("ALTER TABLE host ADD UNIQUE altip (altip)"); 264 } 265} 266 267sub upgrade_add_device_asof { 268 my $self = shift; 269 unless ($self->column_type("device", "mb_asof")) { 270 $self->dowell("ALTER TABLE device ADD COLUMN mb_asof INT CHECK(mb_asof >= 0)"); 271 } 272} 273 274sub upgrade_add_device_weight { 275 my $self = shift; 276 unless ($self->column_type("device", "weight")) { 277 $self->dowell("ALTER TABLE device ADD COLUMN weight INT DEFAULT 100"); 278 } 279} 280 281sub upgrade_add_device_readonly { 282 my $self = shift; 283 unless ($self->column_constraint("device", "status") =~ /readonly/) { 284 $self->dowell("ALTER TABLE device MODIFY COLUMN status VARCHAR(8) CHECK(status IN ('alive', 'dead', 'down', 'readonly'))"); 285 } 286} 287 288sub upgrade_add_device_drain { 289 my $self = shift; 290 unless ($self->column_constraint("device", "status") =~ /drain/) { 291 $self->dowell("ALTER TABLE device MODIFY COLUMN status VARCHAR(8) CHECK(status IN ('alive', 'dead', 'down', 'readonly','drain'))"); 292 } 293} 294 295sub upgrade_add_host_readonly { 296 my $self = shift; 297 my $cn; 298 unless ($self->column_constraint("host", "status", \$cn) =~ /\breadonly\b/) { 299 $self->dbh->begin_work; 300 $self->dowell("ALTER TABLE host DROP CONSTRAINT $cn"); 301 $self->dowell("ALTER TABLE host ADD CONSTRAINT status CHECK(". 302 "status IN ('alive', 'dead', 'down', 'readonly'))"); 303 $self->dbh->commit; 304 } 305} 306 307sub upgrade_modify_server_settings_value { 308 my $self = shift; 309 unless ($self->column_type("server_settings", "value" =~ /text/i)) { 310 $self->dowell("ALTER TABLE server_settings ALTER COLUMN value TYPE TEXT"); 311 } 312} 313 314sub upgrade_add_file_to_queue_arg { 315 my $self = shift; 316 unless ($self->column_type("file_to_queue", "arg")) { 317 $self->dowell("ALTER TABLE file_to_queue ADD COLUMN arg TEXT"); 318 } 319} 320 321# Postgres doesn't have or never used a MEDIUMINT for device. 322sub upgrade_modify_device_size { 323 return 1; 324} 325 326sub upgrade_add_class_hashtype { 327 my ($self) = @_; 328 unless ($self->column_type("class", "hashtype")) { 329 $self->dowell("ALTER TABLE class ADD COLUMN hashtype SMALLINT"); 330 } 331} 332 333# return 1 on success. die otherwise. 334sub enqueue_fids_to_delete { 335 # My kingdom for a real INSERT IGNORE implementation! 336 my ($self, @fidids) = @_; 337 my $sql = "INSERT INTO file_to_delete (fid) VALUES (?)"; 338 339 foreach my $fidid (@fidids) { 340 $self->dbh->begin_work; 341 $self->condthrow; 342 eval { 343 $self->dbh->do($sql, undef, $fidid); 344 }; 345 if ($@ || $self->dbh->err) { 346 if ($self->was_duplicate_error) { 347 # Do nothing 348 } else { 349 $self->condthrow; 350 } 351 } 352 $self->dbh->commit; 353 } 354 355} 356 357sub enqueue_fids_to_delete2 { 358 # My kingdom for a real REPLACE implementation! 359 my ($self, @fidids) = @_; 360 my $tbl = 'file_to_delete2'; 361 my $sql1 = sprintf "INSERT INTO %s (fid, nexttry) VALUES (?,%s)", $tbl, $self->unix_timestamp; 362 my @dup_fids; 363 364 foreach my $fidid (@fidids) { 365 $self->dbh->begin_work; 366 $self->condthrow; 367 eval { 368 $self->dbh->do($sql1, undef, $fidid); 369 }; 370 if ($@ || $self->dbh->err) { 371 if ($self->was_duplicate_error) { 372 push @dup_fids, $fidid; 373 } else { 374 $self->condthrow; 375 } 376 } 377 $self->dbh->commit; 378 } 379 380 my $sql2 = sprintf 'UPDATE %s SET nexttry = %s WHERE fid IN (?)', $tbl, $self->unix_timestamp; 381 382 foreach my $fidid (@dup_fids) { 383 $self->dbh->begin_work; 384 $self->condthrow; 385 eval { 386 $self->dbh->do($sql2, undef, $fidid); 387 }; 388 if ($@ || $self->dbh->err) { 389 if ($self->was_duplicate_error) { 390 # Ignore, no need of it 391 } else { 392 $self->condthrow; 393 } 394 } 395 $self->dbh->commit; 396 } 397 398} 399 400# -------------------------------------------------------------------------- 401# Functions specific to Store::Postgres subclass. Not in parent. 402# -------------------------------------------------------------------------- 403 404sub insert_or_ignore { 405 my $self = shift; 406 my %arg = $self->_valid_params([qw(insert insert_vals)], @_); 407 return $self->insert_or_update( 408 insert => $arg{insert}, 409 insert_vals => $arg{insert_vals}, 410 update => 'IGNORE', 411 update_vals => 'IGNORE', 412 ); 413} 414 415sub insert_or_update { 416 my $self = shift; 417 my %arg = $self->_valid_params([qw(insert update insert_vals update_vals)], @_); 418 my $dbh = $self->dbh; 419 my $savepoint_name = $arg{insert}; 420 $savepoint_name =~ s/^INSERT INTO ([^\s]+).*$/$1/g; 421 422 $dbh->begin_work; 423 $dbh->do('SAVEPOINT '.$savepoint_name); 424 eval { 425 $dbh->do($arg{insert}, undef, @{ $arg{insert_vals} }); 426 }; 427 if ($@ || $dbh->err) { 428 if ($self->was_duplicate_error) { 429 $dbh->do('ROLLBACK TO '.$savepoint_name); 430 if($arg{update} ne "IGNORE") { 431 $dbh->do($arg{update}, undef, @{ $arg{update_vals} }); 432 } 433 } 434 $self->condthrow; 435 } 436 437 $dbh->commit; 438 return 1; 439} 440 441sub column_type { 442 my ($self, $table, $col) = @_; 443 my $sth = $self->dbh->prepare("SELECT column_name,data_type FROM information_schema.columns WHERE table_name=? AND column_name=?"); 444 $sth->execute($table,$col); 445 while (my $rec = $sth->fetchrow_hashref) { 446 if ($rec->{column_name} eq $col) { 447 $sth->finish; 448 return $rec->{data_type}; 449 } 450 } 451 return undef; 452} 453 454sub column_constraint { 455 my ($self, $table, $col, $cn) = @_; 456 my $sth = $self->dbh->prepare("SELECT column_name,information_schema.check_constraints.check_clause,constraint_name FROM information_schema.constraint_column_usage JOIN information_schema.check_constraints USING(constraint_catalog,constraint_schema,constraint_name) WHERE table_name=? AND column_name=?"); 457 $sth->execute($table,$col); 458 while (my $rec = $sth->fetchrow_hashref) { 459 if ($rec->{column_name} eq $col) { 460 $sth->finish; 461 $$cn = $rec->{constraint_name} if $cn; 462 return $rec->{check_clause}; 463 } 464 } 465 return undef; 466} 467 468sub fid_type { 469 my $self = shift; 470 return $self->{_fid_type} if $self->{_fid_type}; 471 472 # let people force bigint mode with environment. 473 if ($ENV{MOG_FIDSIZE} && $ENV{MOG_FIDSIZE} eq "big") { 474 return $self->{_fid_type} = "BIGINT"; 475 } 476 477 # else, check a maybe-existing table and see if we're in bigint 478 # mode already. 479 my $dbh = $self->dbh; 480 my $file_fid_type = $self->column_type("file", "fid"); 481 if($file_fid_type) { 482 if ($file_fid_type =~ /bigint/i) { 483 return $self->{_fid_type} = "BIGINT"; 484 } elsif($file_fid_type =~ /int/i) { 485 # Old installs might not have raised the fid type size yet. 486 return $self->{_fid_type} = "INT"; 487 } 488 } 489 490 # Used to default to 32bit ints, but this always bites people 491 # a few years down the road. So default to 64bit. 492 return $self->{_fid_type} = "BIGINT"; 493} 494 495# -------------------------------------------------------------------------- 496# Test suite things we override 497# -------------------------------------------------------------------------- 498 499sub new_temp { 500 my $self = shift; 501 my %args = @_; 502 my $dbname = $args{dbname} || "tmp_mogiletest"; 503 my $host = $args{dbhost} || 'localhost'; 504 my $port = $args{dbport} || 5432; 505 my $user = $args{dbuser} || 'mogile'; 506 my $pass = $args{dbpass} || ''; 507 my $rootuser = $args{dbrootuser} || $args{dbuser} || 'postgres'; 508 my $rootpass = $args{dbrootpass} || $args{dbpass} || ''; 509 _drop_db($dbname,$host,$port,$rootuser,$rootpass); 510 511 my @args = ( "$FindBin::Bin/../mogdbsetup", "--yes", 512 "--dbname=$dbname", "--type=Postgres", 513 "--dbhost=$host", "--dbport=$port", 514 "--dbuser=$user", 515 "--dbrootuser=$rootuser", ); 516 push @args, "--dbpass=$pass" unless $pass eq ''; 517 push @args, "--dbrootpass=$rootpass" unless $rootpass eq ''; 518 system(@args) 519 and die "Failed to run mogdbsetup (".join(' ',map { "'".$_."'" } @args).")."; 520 521 return MogileFS::Store->new_from_dsn_user_pass("dbi:Pg:dbname=$dbname;host=$host;port=$port", 522 $user, 523 $pass); 524} 525 526my $rootdbh; 527sub _root_dbh { 528 my $host = shift; 529 my $port = shift; 530 my $rootuser = shift; 531 my $rootpass = shift; 532 return $rootdbh ||= DBI->connect("DBI:Pg:dbname=postgres;host=$host;port=$port", $rootuser, $rootpass, { RaiseError => 1 }) 533 or die "Couldn't connect to local PostgreSQL database as $rootuser"; 534} 535 536sub _drop_db { 537 my $dbname = shift; 538 my $host = shift; 539 my $port = shift; 540 my $rootuser = shift; 541 my $rootpass = shift; 542 my $root_dbh = _root_dbh($host, $port, $rootuser, $rootpass); 543 eval { 544 $root_dbh->do("DROP DATABASE $dbname"); 545 }; 546} 547 548 549# -------------------------------------------------------------------------- 550# Data-access things we override 551# -------------------------------------------------------------------------- 552 553# returns 1 on success, 0 on duplicate key error, dies on exception 554# TODO: need a test to hit the duplicate name error condition 555sub rename_file { 556 my ($self, $fidid, $to_key) = @_; 557 my $dbh = $self->dbh; 558 eval { 559 $dbh->do('UPDATE file SET dkey = ? WHERE fid=?', 560 undef, $to_key, $fidid); 561 }; 562 if ($@ || $dbh->err) { 563 # first is error code for duplicates 564 if ($self->was_duplicate_error) { 565 return 0; 566 } else { 567 die $@; 568 } 569 } 570 $self->condthrow; 571 return 1; 572} 573 574# add a record of fidid existing on devid 575# returns 1 on success, 0 on duplicate 576sub add_fidid_to_devid { 577 my ($self, $fidid, $devid) = @_; 578 my $dbh = $self->dbh; 579 eval { 580 $dbh->do("INSERT INTO file_on (fid, devid) VALUES (?, ?)", undef, $fidid, $devid); 581 }; 582 583 return 1 if !$@ && !$dbh->err; 584 return 0; 585} 586 587# update the device count for a given fidid 588sub update_devcount_atomic { 589 my ($self, $fidid) = @_; 590 my $rv; 591 592 $self->dbh->begin_work; 593 $rv = $self->dbh->do("SELECT devcount FROM file WHERE fid=? FOR UPDATE", undef, $fidid); 594 $self->condthrow; 595 if($rv == 0) { 596 $self->dbh->rollback; 597 return 1; 598 } 599 $rv = $self->dbh->do("UPDATE file SET devcount=(SELECT COUNT(devid) FROM file_on WHERE fid=?) WHERE fid=?", undef, $fidid, $fidid); 600 $self->condthrow; 601 $self->dbh->commit; 602 $self->condthrow; 603 return $rv; 604} 605 606# enqueue a fidid for replication, from a specific deviceid (can be undef), in a given number of seconds. 607sub enqueue_for_replication { 608 my ($self, $fidid, $from_devid, $in) = @_; 609 my $dbh = $self->dbh; 610 611 my $nexttry = 0; 612 if ($in) { 613 $nexttry = $self->unix_timestamp." + ${in}::int"; 614 } 615 616 eval { 617 $dbh->do("INSERT INTO file_to_replicate (fid, fromdevid, nexttry) VALUES (?, ?, $nexttry)", 618 undef, $fidid, $from_devid); 619 }; 620} 621 622# reschedule all deferred replication, return number rescheduled 623sub replicate_now { 624 my ($self) = @_; 625 return $self->dbh->do("UPDATE file_to_replicate SET nexttry = ".$self->unix_timestamp." WHERE nexttry > ".$self->unix_timestamp); 626} 627 628sub reschedule_file_to_replicate_relative { 629 my ($self, $fid, $in_n_secs) = @_; 630 $self->dbh->do("UPDATE file_to_replicate SET nexttry = ".$self->unix_timestamp." + ?, failcount = failcount + 1 WHERE fid = ?", 631 undef, $in_n_secs, $fid); 632} 633 634# creates a new domain, given a domain namespace string. return the dmid on success, 635# throw 'dup' on duplicate name. 636sub create_domain { 637 my ($self, $name) = @_; 638 my $dbh = $self->dbh; 639 640 # get the max domain id 641 my $maxid = $dbh->selectrow_array('SELECT MAX(dmid) FROM domain') || 0; 642 my $rv = eval { 643 $dbh->do('INSERT INTO domain (dmid, namespace) VALUES (?, ?)', 644 undef, $maxid + 1, $name); 645 }; 646 if ($self->was_duplicate_error) { 647 throw("dup"); 648 } 649 return $maxid+1 if $rv; 650 die "failed to make domain"; # FIXME: the above is racy. 651} 652 653sub set_server_setting { 654 my ($self, $key, $val) = @_; 655 my $dbh = $self->dbh; 656 657 if (defined $val) { 658 $self->insert_or_update( 659 insert => "INSERT INTO server_settings (field, value) VALUES (?, ?)", 660 insert_vals => [ $key, $val ], 661 update => "UPDATE server_settings SET value = ? WHERE field = ?", 662 update_vals => [ $val, $key ], 663 ); 664 } else { 665 $dbh->do("DELETE FROM server_settings WHERE field=?", undef, $key); 666 } 667 668 die "Error updating 'server_settings': " . $dbh->errstr if $dbh->err; 669 return 1; 670} 671 672# This implementation is race-safe 673sub incr_server_setting { 674 my ($self, $key, $val) = @_; 675 $val = 1 unless defined $val; 676 return unless $val; 677 678 $self->dbh->begin_work; 679 my $value = $self->dbh->selectrow_array("SELECT value FROM server_settings WHERE field=? FOR UPDATE",undef,$key); 680 if($value) { 681 if($value =~ /^\d+$/) { 682 $value += $val; 683 } else { 684 warning("Wanted to incr_server_setting by $val on field=$key but old value was $value. Setting instead."); 685 $value = $val; 686 } 687 my $rv = $self->dbh->do("UPDATE server_settings ". 688 "SET value=? ". 689 "WHERE field=?", undef, 690 $value, $key) > 0; 691 $self->dbh->commit; 692 return 1 if $rv; 693 } 694 $self->dbh->rollback; # Release the row-lock 695 $self->set_server_setting($key, $val); 696} 697 698# return 1 on success, throw "dup" on duplicate devid or throws other error on failure 699sub create_device { 700 my ($self, $devid, $hostid, $status) = @_; 701 my $rv = $self->conddup(sub { 702 $self->dbh->do("INSERT INTO device (devid, hostid, status) VALUES (?, ?, ?)", undef, 703 $devid, $hostid, $status); 704 }); 705 $self->condthrow; 706 die "error making device $devid\n" unless $rv > 0; 707 return 1; 708} 709 710sub replace_into_file { 711 my $self = shift; 712 my %arg = $self->_valid_params([qw(fidid dmid key length classid devcount)], @_); 713 $self->insert_or_update( 714 insert => "INSERT INTO file (fid, dmid, dkey, length, classid, devcount) VALUES (?, ?, ?, ?, ?, ?)", 715 insert_vals => [ @arg{'fidid', 'dmid', 'key', 'length', 'classid', 'devcount'} ], 716 update => "UPDATE file SET dmid=?, dkey=?, length=?, classid=?, devcount=? WHERE fid=?", 717 update_vals => [ @arg{'dmid', 'key', 'length', 'classid', 'devcount', 'fidid'} ], 718 ); 719 $self->condthrow; 720} 721 722# given an array of MogileFS::DevFID objects, mass-insert them all 723# into file_on (ignoring if they're already present) 724sub mass_insert_file_on { 725 my ($self, @devfids) = @_; 726 my @qmarks = map { "(?,?)" } @devfids; 727 my @binds = map { $_->fidid, $_->devid } @devfids; 728 729 my $sth = $self->dbh->prepare("INSERT INTO file_on (fid, devid) VALUES (?, ?)"); 730 foreach (@devfids) { 731 eval { 732 $sth->execute($_->fidid, $_->devid); 733 }; 734 $self->condthrow unless $self->was_duplicate_error; 735 } 736 return 1; 737} 738sub lockid { 739 my ($lockname) = @_; 740 croak("Called with empty lockname! $lockname") unless (defined $lockname && length($lockname) > 0); 741 my $num = unpack 'N',md5($lockname); 742 return ($num & 0x7fffffff); 743} 744 745# attempt to grab a lock of lockname, and timeout after timeout seconds. 746# the lock should be unique in the space of (lockid), as well the space of 747# (hostname,pid). 748# returns 1 on success and 0 on timeout 749sub get_lock { 750 my ($self, $lockname, $timeout) = @_; 751 my $hostid = lockid(hostname); 752 my $lockid = lockid($lockname); 753 die sprintf("Lock recursion detected (grabbing %s on %s (%s/%s), had %s (%s). Bailing out.", $lockname, hostname, $hostid, $lockid, $self->{last_lock}, lockid($self->{last_lock})) if $self->{lock_depth}; 754 755 debug("$$ Locking $lockname ($lockid)\n") if $Mgd::DEBUG >= 5; 756 757 my $lock = undef; 758 while($timeout >= 0) { 759 $lock = $self->dbh->selectrow_array("SELECT pg_try_advisory_lock(?, ?)", undef, $hostid, $lockid); 760 $self->condthrow; 761 if (defined $lock) { 762 if($lock == 1) { 763 $self->{lock_depth} = 1; 764 $self->{last_lock} = $lockname; 765 last; 766 } elsif($lock == 0) { 767 sleep 1 if $timeout > 0; 768 $timeout--; 769 next; 770 } else { 771 die "Something went horribly wrong while getting lock $lockname - unknown return value"; 772 } 773 } else { 774 die "Something went horribly wrong while getting lock $lockname - undefined lock"; 775 } 776 } 777 return $lock; 778} 779 780# attempt to release a lock of lockname. 781# returns 1 on success and 0 if no lock we have has that name. 782sub release_lock { 783 my ($self, $lockname) = @_; 784 my $hostid = lockid(hostname); 785 my $lockid = lockid($lockname); 786 debug("$$ Unlocking $lockname ($lockid)\n") if $Mgd::DEBUG >= 5; 787 my $rv = $self->dbh->selectrow_array("SELECT pg_advisory_unlock(?, ?)", undef, $hostid, $lockid); 788 debug("Double-release of lock $lockname!") if $self->{lock_depth} != 0 and $rv == 0 and $Mgd::DEBUG >= 2; 789 $self->condthrow; 790 $self->{lock_depth} = 0; 791 return $rv; 792} 793 794sub BLOB_BIND_TYPE { { pg_type => PG_BYTEA } } 795 796sub set_checksum { 797 my ($self, $fidid, $hashtype, $checksum) = @_; 798 my $dbh = $self->dbh; 799 800 $dbh->begin_work; 801 eval { 802 my $sth = $dbh->prepare("INSERT INTO checksum " . 803 "(fid, hashtype, checksum) ". 804 "VALUES (?, ?, ?)"); 805 $sth->bind_param(1, $fidid); 806 $sth->bind_param(2, $hashtype); 807 $sth->bind_param(3, $checksum, BLOB_BIND_TYPE); 808 $sth->execute; 809 }; 810 if ($@ || $dbh->err) { 811 if ($self->was_duplicate_error) { 812 eval { 813 my $sth = $dbh->prepare("UPDATE checksum " . 814 "SET hashtype = ?, checksum = ? " . 815 "WHERE fid = ?"); 816 $sth->bind_param(1, $hashtype); 817 $sth->bind_param(2, $checksum, BLOB_BIND_TYPE); 818 $sth->bind_param(3, $fidid); 819 $sth->execute; 820 }; 821 $self->condthrow; 822 } 823 } 824 $dbh->commit; 825 $self->condthrow; 826} 827 8281; 829 830__END__ 831 832=head1 NAME 833 834MogileFS::Store::Postgres - PostgreSQL data storage for MogileFS 835 836=head1 SEE ALSO 837 838L<MogileFS::Store> 839 840 841