1package MogileFS::Store; 2use strict; 3use warnings; 4use Carp qw(croak confess); 5use MogileFS::Util qw(throw max error); 6use DBI; # no reason a Store has to be DBI-based, but for now they all are. 7use List::Util qw(shuffle); 8 9# this is incremented whenever the schema changes. server will refuse 10# to start-up with an old schema version 11# 12# 6: adds file_to_replicate table 13# 7: adds file_to_delete_later table 14# 8: adds fsck_log table 15# 9: adds 'drain' state to enum in device table 16# 10: adds 'replpolicy' column to 'class' table 17# 11: adds 'file_to_queue' table 18# 12: adds 'file_to_delete2' table 19# 13: modifies 'server_settings.value' to TEXT for wider values 20# also adds a TEXT 'arg' column to file_to_queue for passing arguments 21# 14: modifies 'device' mb_total, mb_used to INT for devs > 16TB 22# 15: adds checksum table, adds 'hashtype' column to 'class' table 23# 16: no-op, see 17 24# 17: adds 'readonly' state to enum in host table 25use constant SCHEMA_VERSION => 17; 26 27sub new { 28 my ($class) = @_; 29 return $class->new_from_dsn_user_pass(map { MogileFS->config($_) } qw(db_dsn db_user db_pass max_handles)); 30} 31 32sub new_from_dsn_user_pass { 33 my ($class, $dsn, $user, $pass, $max_handles) = @_; 34 my $subclass; 35 if ($dsn =~ /^DBI:mysql:/i) { 36 $subclass = "MogileFS::Store::MySQL"; 37 } elsif ($dsn =~ /^DBI:SQLite:/i) { 38 $subclass = "MogileFS::Store::SQLite"; 39 } elsif ($dsn =~ /^DBI:Oracle:/i) { 40 $subclass = "MogileFS::Store::Oracle"; 41 } elsif ($dsn =~ /^DBI:Pg:/i) { 42 $subclass = "MogileFS::Store::Postgres"; 43 } else { 44 die "Unknown database type: $dsn"; 45 } 46 unless (eval "use $subclass; 1") { 47 die "Error loading $subclass: $@\n"; 48 } 49 my $self = bless { 50 dsn => $dsn, 51 user => $user, 52 pass => $pass, 53 max_handles => $max_handles, # Max number of handles to allow 54 raise_errors => $subclass->want_raise_errors, 55 slave_list_version => 0, 56 slave_list_cache => [], 57 recheck_req_gen => 0, # incremented generation, of recheck of dbh being requested 58 recheck_done_gen => 0, # once recheck is done, copy of what the request generation was 59 handles_left => 0, # amount of times this handle can still be verified 60 connected_slaves => {}, 61 dead_slaves => {}, 62 dead_backoff => {}, # how many times in a row a slave has died 63 connect_timeout => 10, # High default. 64 }, $subclass; 65 $self->init; 66 return $self; 67} 68 69# Defaults to true now. 70sub want_raise_errors { 71 1; 72} 73 74sub new_from_mogdbsetup { 75 my ($class, %args) = @_; 76 # where args is: dbhost dbport dbname dbrootuser dbrootpass dbuser dbpass 77 my $dsn = $class->dsn_of_dbhost($args{dbname}, $args{dbhost}, $args{dbport}); 78 79 my $try_make_sto = sub { 80 my $dbh = DBI->connect($dsn, $args{dbuser}, $args{dbpass}, { 81 PrintError => 0, 82 }) or return undef; 83 my $sto = $class->new_from_dsn_user_pass($dsn, $args{dbuser}, $args{dbpass}); 84 $sto->raise_errors; 85 return $sto; 86 }; 87 88 # upgrading, apparently, as this database already exists. 89 my $sto = $try_make_sto->(); 90 return $sto if $sto; 91 92 # otherwise, we need to make the requested database, setup permissions, etc 93 $class->status("couldn't connect to database as mogilefs user. trying root..."); 94 my $rootdsn = $class->dsn_of_root($args{dbname}, $args{dbhost}, $args{dbport}); 95 my $rdbh = DBI->connect($rootdsn, $args{dbrootuser}, $args{dbrootpass}, { 96 PrintError => 0, 97 }) or 98 die "Failed to connect to $rootdsn as specified root user ($args{dbrootuser}): " . DBI->errstr . "\n"; 99 $class->status("connected to database as root user."); 100 101 $class->confirm("Create/Upgrade database name '$args{dbname}'?"); 102 $class->create_db_if_not_exists($rdbh, $args{dbname}); 103 $class->confirm("Grant all privileges to user '$args{dbuser}', connecting from anywhere, to the mogilefs database '$args{dbname}'?"); 104 $class->grant_privileges($rdbh, $args{dbname}, $args{dbuser}, $args{dbpass}); 105 106 # should be ready now: 107 $sto = $try_make_sto->(); 108 return $sto if $sto; 109 110 die "Failed to connect to database as regular user, even after creating it and setting up permissions as the root user."; 111} 112 113# given a root DBI connection, create the named database. succeed 114# if it it's made, or already exists. die otherwise. 115sub create_db_if_not_exists { 116 my ($pkg, $rdbh, $dbname) = @_; 117 $rdbh->do("CREATE DATABASE IF NOT EXISTS $dbname") 118 or die "Failed to create database '$dbname': " . $rdbh->errstr . "\n"; 119} 120 121sub grant_privileges { 122 my ($pkg, $rdbh, $dbname, $user, $pass) = @_; 123 $rdbh->do("GRANT ALL PRIVILEGES ON $dbname.* TO $user\@'\%' IDENTIFIED BY ?", 124 undef, $pass) 125 or die "Failed to grant privileges: " . $rdbh->errstr . "\n"; 126 $rdbh->do("GRANT ALL PRIVILEGES ON $dbname.* TO $user\@'localhost' IDENTIFIED BY ?", 127 undef, $pass) 128 or die "Failed to grant privileges: " . $rdbh->errstr . "\n"; 129} 130 131sub can_replace { 0 } 132sub can_insertignore { 0 } 133sub can_insert_multi { 0 } 134sub can_for_update { 1 } 135 136sub unix_timestamp { die "No function in $_[0] to return DB's unixtime." } 137 138sub ignore_replace { 139 my $self = shift; 140 return "INSERT IGNORE " if $self->can_insertignore; 141 return "REPLACE " if $self->can_replace; 142 die "Can't INSERT IGNORE or REPLACE?"; 143} 144 145my $on_status = sub {}; 146my $on_confirm = sub { 1 }; 147sub on_status { my ($pkg, $code) = @_; $on_status = $code; }; 148sub on_confirm { my ($pkg, $code) = @_; $on_confirm = $code; }; 149sub status { my ($pkg, $msg) = @_; $on_status->($msg); }; 150sub confirm { my ($pkg, $msg) = @_; $on_confirm->($msg) or die "Aborted.\n"; }; 151 152sub latest_schema_version { SCHEMA_VERSION } 153 154sub raise_errors { 155 my $self = shift; 156 $self->{raise_errors} = 1; 157 $self->dbh->{RaiseError} = 1; 158} 159 160sub set_connect_timeout { $_[0]{connect_timeout} = $_[1]; } 161 162sub dsn { $_[0]{dsn} } 163sub user { $_[0]{user} } 164sub pass { $_[0]{pass} } 165 166sub connect_timeout { $_[0]{connect_timeout} } 167 168sub init { 1 } 169sub post_dbi_connect { 1 } 170 171sub can_do_slaves { 0 } 172 173sub mark_as_slave { 174 my $self = shift; 175 die "Incapable of becoming slave." unless $self->can_do_slaves; 176 177 $self->{is_slave} = 1; 178} 179 180sub is_slave { 181 my $self = shift; 182 return $self->{is_slave}; 183} 184 185sub _slaves_list_changed { 186 my $self = shift; 187 my $ver = MogileFS::Config->server_setting_cached('slave_version') || 0; 188 if ($ver <= $self->{slave_list_version}) { 189 return 0; 190 } 191 $self->{slave_list_version} = $ver; 192 # Restart connections from scratch if the configuration changed. 193 $self->{connected_slaves} = {}; 194 return 1; 195} 196 197# Returns a list of arrayrefs, each being [$dsn, $username, $password] for connecting to a slave DB. 198sub _slaves_list { 199 my $self = shift; 200 my $now = time(); 201 202 my $sk = MogileFS::Config->server_setting_cached('slave_keys') 203 or return (); 204 205 my @ret; 206 foreach my $key (split /\s*,\s*/, $sk) { 207 my $slave = MogileFS::Config->server_setting_cached("slave_$key"); 208 209 if (!$slave) { 210 error("key for slave DB config: slave_$key not found in configuration"); 211 next; 212 } 213 214 my ($dsn, $user, $pass) = split /\|/, $slave; 215 if (!defined($dsn) or !defined($user) or !defined($pass)) { 216 error("key slave_$key contains $slave, which doesn't split in | into DSN|user|pass - ignoring"); 217 next; 218 } 219 push @ret, [$dsn, $user, $pass] 220 } 221 222 return @ret; 223} 224 225sub _pick_slave { 226 my $self = shift; 227 my @temp = shuffle keys %{$self->{connected_slaves}}; 228 return unless @temp; 229 return $self->{connected_slaves}->{$temp[0]}; 230} 231 232sub _connect_slave { 233 my $self = shift; 234 my $slave_fulldsn = shift; 235 my $now = time(); 236 237 my $dead_retry = 238 MogileFS::Config->server_setting_cached('slave_dead_retry_timeout') || 15; 239 240 my $dead_backoff = $self->{dead_backoff}->{$slave_fulldsn->[0]} || 0; 241 my $dead_timeout = $self->{dead_slaves}->{$slave_fulldsn->[0]}; 242 return if (defined $dead_timeout 243 && $dead_timeout + ($dead_retry * $dead_backoff) > $now); 244 return if ($self->{connected_slaves}->{$slave_fulldsn->[0]}); 245 246 my $newslave = $self->{slave} = $self->new_from_dsn_user_pass(@$slave_fulldsn); 247 $newslave->set_connect_timeout( 248 MogileFS::Config->server_setting_cached('slave_connect_timeout') || 1); 249 $self->{slave}->{next_check} = 0; 250 $newslave->mark_as_slave; 251 if ($self->check_slave) { 252 $self->{connected_slaves}->{$slave_fulldsn->[0]} = $newslave; 253 $self->{dead_backoff}->{$slave_fulldsn->[0]} = 0; 254 } else { 255 # Magic numbers are saddening... 256 $dead_backoff++ unless $dead_backoff > 20; 257 $self->{dead_slaves}->{$slave_fulldsn->[0]} = $now; 258 $self->{dead_backoff}->{$slave_fulldsn->[0]} = $dead_backoff; 259 } 260} 261 262sub get_slave { 263 my $self = shift; 264 265 die "Incapable of having slaves." unless $self->can_do_slaves; 266 267 $self->{slave} = undef; 268 foreach my $slave (keys %{$self->{dead_slaves}}) { 269 my ($full_dsn) = grep { $slave eq $_->[0] } @{$self->{slave_list_cache}}; 270 unless ($full_dsn) { 271 delete $self->{dead_slaves}->{$slave}; 272 next; 273 } 274 $self->_connect_slave($full_dsn); 275 } 276 277 unless ($self->_slaves_list_changed) { 278 if ($self->{slave} = $self->_pick_slave) { 279 $self->{slave}->{recheck_req_gen} = $self->{recheck_req_gen}; 280 return $self->{slave} if $self->check_slave; 281 } 282 } 283 284 if ($self->{slave}) { 285 my $dsn = $self->{slave}->{dsn}; 286 $self->{dead_slaves}->{$dsn} = time(); 287 $self->{dead_backoff}->{$dsn} = 0; 288 delete $self->{connected_slaves}->{$dsn}; 289 error("Error talking to slave: $dsn"); 290 } 291 my @slaves_list = $self->_slaves_list; 292 293 # If we have no slaves, then return silently. 294 return unless @slaves_list; 295 296 my $slave_skip_filtering = MogileFS::Config->server_setting_cached('slave_skip_filtering'); 297 298 unless (defined $slave_skip_filtering && $slave_skip_filtering eq 'on') { 299 MogileFS::run_global_hook('slave_list_filter', \@slaves_list); 300 } 301 302 $self->{slave_list_cache} = \@slaves_list; 303 304 foreach my $slave_fulldsn (@slaves_list) { 305 $self->_connect_slave($slave_fulldsn); 306 } 307 308 if ($self->{slave} = $self->_pick_slave) { 309 return $self->{slave}; 310 } 311 warn "Slave list exhausted, failing back to master."; 312 return; 313} 314 315sub read_store { 316 my $self = shift; 317 318 return $self unless $self->can_do_slaves; 319 320 if ($self->{slave_ok}) { 321 if (my $slave = $self->get_slave) { 322 return $slave; 323 } 324 } 325 326 return $self; 327} 328 329sub slaves_ok { 330 my $self = shift; 331 my $coderef = shift; 332 333 return unless ref $coderef eq 'CODE'; 334 335 local $self->{slave_ok} = 1; 336 337 return $coderef->(@_); 338} 339 340sub recheck_dbh { 341 my $self = shift; 342 $self->{recheck_req_gen}++; 343} 344 345sub dbh { 346 my $self = shift; 347 348 if ($self->{dbh}) { 349 if ($self->{recheck_done_gen} != $self->{recheck_req_gen}) { 350 $self->{dbh} = undef unless $self->{dbh}->ping; 351 # Handles a memory leak under Solaris/Postgres. 352 # We may leak a little extra memory if we're holding a lock, 353 # since dropping a connection mid-lock is fatal 354 $self->{dbh} = undef if ($self->{max_handles} && 355 $self->{handles_left}-- < 0 && !$self->{lock_depth}); 356 $self->{recheck_done_gen} = $self->{recheck_req_gen}; 357 } 358 return $self->{dbh} if $self->{dbh}; 359 } 360 361 # Shortcut flag: if monitor thinks the master is down, avoid attempting to 362 # connect to it for now. If we already have a connection to the master, 363 # keep using it as above. 364 if (!$self->is_slave) { 365 my $flag = MogileFS::Config->server_setting_cached('_master_db_alive', 0); 366 return if (defined $flag && $flag == 0);; 367 } 368 369 # auto-reconnect is unsafe if we're holding a lock 370 if ($self->{lock_depth}) { 371 die "DB connection recovery unsafe, lock held: $self->{last_lock}"; 372 } 373 374 eval { 375 local $SIG{ALRM} = sub { die "timeout\n" }; 376 alarm($self->connect_timeout); 377 $self->{dbh} = DBI->connect($self->{dsn}, $self->{user}, $self->{pass}, { 378 PrintError => 0, 379 AutoCommit => 1, 380 # FUTURE: will default to on (have to validate all callers first): 381 RaiseError => ($self->{raise_errors} || 0), 382 sqlite_use_immediate_transaction => 1, 383 }); 384 }; 385 alarm(0); 386 if ($@ eq "timeout\n") { 387 die "Failed to connect to database: timeout"; 388 } elsif ($@) { 389 die "Failed to connect to database: " . DBI->errstr; 390 } 391 $self->post_dbi_connect; 392 $self->{handles_left} = $self->{max_handles} if $self->{max_handles}; 393 return $self->{dbh}; 394} 395 396sub have_dbh { return 1 if $_[0]->{dbh}; } 397 398sub ping { 399 my $self = shift; 400 return $self->dbh->ping; 401} 402 403sub condthrow { 404 my ($self, $optmsg) = @_; 405 my $dbh = $self->dbh; 406 return 1 unless $dbh->err; 407 my ($pkg, $fn, $line) = caller; 408 my $msg = "Database error from $pkg/$fn/$line: " . $dbh->errstr; 409 $msg .= ": $optmsg" if $optmsg; 410 # Auto rollback failures around transactions. 411 if ($dbh->{AutoCommit} == 0) { eval { $dbh->rollback }; } 412 croak($msg); 413} 414 415sub dowell { 416 my ($self, $sql, @do_params) = @_; 417 my $rv = eval { $self->dbh->do($sql, @do_params) }; 418 return $rv unless $@ || $self->dbh->err; 419 warn "Error with SQL: $sql\n"; 420 Carp::confess($@ || $self->dbh->errstr); 421} 422 423sub _valid_params { 424 croak("Odd number of parameters!") if scalar(@_) % 2; 425 my ($self, $vlist, %uarg) = @_; 426 my %ret; 427 $ret{$_} = delete $uarg{$_} foreach @$vlist; 428 croak("Bogus options: ".join(',',keys %uarg)) if %uarg; 429 return %ret; 430} 431 432sub was_deadlock_error { 433 my $self = shift; 434 my $dbh = $self->dbh; 435 die "UNIMPLEMENTED"; 436} 437 438sub was_duplicate_error { 439 my $self = shift; 440 my $dbh = $self->dbh; 441 die "UNIMPLEMENTED"; 442} 443 444# run a subref (presumably a database update) in an eval, because you expect it to 445# maybe fail on duplicate key error, and throw a dup exception for you, else return 446# its return value 447sub conddup { 448 my ($self, $code) = @_; 449 my $rv = eval { $code->(); }; 450 throw("dup") if $self->was_duplicate_error; 451 croak($@) if $@; 452 return $rv; 453} 454 455# insert row if doesn't already exist 456# WARNING: This function is NOT transaction safe if the duplicate errors causes 457# your transaction to halt! 458# WARNING: This function is NOT safe on multi-row inserts if can_insertignore 459# is false! Rows before the duplicate will be inserted, but rows after the 460# duplicate might not be, depending your database. 461sub insert_ignore { 462 my ($self, $sql, @params) = @_; 463 my $dbh = $self->dbh; 464 if ($self->can_insertignore) { 465 return $dbh->do("INSERT IGNORE $sql", @params); 466 } else { 467 # TODO: Detect bad multi-row insert here. 468 my $rv = eval { $dbh->do("INSERT $sql", @params); }; 469 if ($@ || $dbh->err) { 470 return 1 if $self->was_duplicate_error; 471 # This chunk is identical to condthrow, but we include it directly 472 # here as we know there is definitely an error, and we would like 473 # the caller of this function. 474 my ($pkg, $fn, $line) = caller; 475 my $msg = "Database error from $pkg/$fn/$line: " . $dbh->errstr; 476 croak($msg); 477 } 478 return $rv; 479 } 480} 481 482sub retry_on_deadlock { 483 my $self = shift; 484 my $code = shift; 485 my $tries = shift || 3; 486 croak("deadlock retries must be positive") if $tries < 1; 487 my $rv; 488 489 while ($tries-- > 0) { 490 $rv = eval { $code->(); }; 491 next if ($self->was_deadlock_error); 492 croak($@) if $@; 493 last; 494 } 495 return $rv; 496} 497 498# -------------------------------------------------------------------------- 499 500my @extra_tables; 501 502sub add_extra_tables { 503 my $class = shift; 504 push @extra_tables, @_; 505} 506 507use constant TABLES => qw( domain class file tempfile file_to_delete 508 unreachable_fids file_on file_on_corrupt host 509 device server_settings file_to_replicate 510 file_to_delete_later fsck_log file_to_queue 511 file_to_delete2 checksum); 512 513sub setup_database { 514 my $sto = shift; 515 516 my $curver = $sto->schema_version; 517 518 my $latestver = SCHEMA_VERSION; 519 if ($curver == $latestver) { 520 $sto->status("Schema already up-to-date at version $curver."); 521 return 1; 522 } 523 524 if ($curver > $latestver) { 525 die "Your current schema version is $curver, but this version of mogdbsetup only knows up to $latestver. Aborting to be safe.\n"; 526 } 527 528 if ($curver) { 529 $sto->confirm("Install/upgrade your schema from version $curver to version $latestver?"); 530 } 531 532 foreach my $t (TABLES, @extra_tables) { 533 $sto->create_table($t); 534 } 535 536 $sto->upgrade_add_host_getport; 537 $sto->upgrade_add_host_altip; 538 $sto->upgrade_add_device_asof; 539 $sto->upgrade_add_device_weight; 540 $sto->upgrade_add_device_readonly; 541 $sto->upgrade_add_device_drain; 542 $sto->upgrade_add_class_replpolicy; 543 $sto->upgrade_modify_server_settings_value; 544 $sto->upgrade_add_file_to_queue_arg; 545 $sto->upgrade_modify_device_size; 546 $sto->upgrade_add_class_hashtype; 547 $sto->upgrade_add_host_readonly; 548 549 return 1; 550} 551 552sub cached_schema_version { 553 my $self = shift; 554 return $self->{_cached_schema_version} ||= 555 $self->schema_version; 556} 557 558sub schema_version { 559 my $self = shift; 560 my $dbh = $self->dbh; 561 return eval { 562 $dbh->selectrow_array("SELECT value FROM server_settings WHERE field='schema_version'") || 0; 563 } || 0; 564} 565 566sub filter_create_sql { my ($self, $sql) = @_; return $sql; } 567 568sub create_table { 569 my ($self, $table) = @_; 570 my $dbh = $self->dbh; 571 return 1 if $self->table_exists($table); 572 my $meth = "TABLE_$table"; 573 my $sql = $self->$meth; 574 $sql = $self->filter_create_sql($sql); 575 $self->status("Running SQL: $sql;"); 576 $dbh->do($sql) or 577 die "Failed to create table $table: " . $dbh->errstr; 578 my $imeth = "INDEXES_$table"; 579 my @indexes = eval { $self->$imeth }; 580 foreach $sql (@indexes) { 581 $self->status("Running SQL: $sql;"); 582 $dbh->do($sql) or 583 die "Failed to create indexes on $table: " . $dbh->errstr; 584 } 585} 586 587# Please try to keep all tables aligned nicely 588# with '"CREATE TABLE' on the first line 589# and ')"' alone on the last line. 590 591sub TABLE_domain { 592 # classes are tied to domains. domains can have classes of items 593 # with different mindevcounts. 594 # 595 # a minimum devcount is the number of copies the system tries to 596 # maintain for files in that class 597 # 598 # unspecified classname means classid=0 (implicit class), and that 599 # implies mindevcount=2 600 "CREATE TABLE domain ( 601 dmid SMALLINT UNSIGNED NOT NULL PRIMARY KEY, 602 namespace VARCHAR(255), 603 UNIQUE (namespace) 604 )" 605} 606 607sub TABLE_class { 608 "CREATE TABLE class ( 609 dmid SMALLINT UNSIGNED NOT NULL, 610 classid TINYINT UNSIGNED NOT NULL, 611 PRIMARY KEY (dmid,classid), 612 classname VARCHAR(50), 613 UNIQUE (dmid,classname), 614 mindevcount TINYINT UNSIGNED NOT NULL, 615 hashtype TINYINT UNSIGNED 616 )" 617} 618 619# the length field is only here for easy verifications of content 620# integrity when copying around. no sums or content types or other 621# metadata here. application can handle that. 622# 623# classid is what class of file this belongs to. for instance, on fotobilder 624# there will be a class for original pictures (the ones the user uploaded) 625# and a class for derived images (scaled down versions, thumbnails, greyscale, etc) 626# each domain can setup classes and assign the minimum redundancy level for 627# each class. fotobilder will use a 2 or 3 minimum copy redundancy for original 628# photos and and a 1 minimum for derived images (which means the sole device 629# for a derived image can die, bringing devcount to 0 for that file, but 630# the application can recreate it from its original) 631sub TABLE_file { 632 "CREATE TABLE file ( 633 fid INT UNSIGNED NOT NULL, 634 PRIMARY KEY (fid), 635 636 dmid SMALLINT UNSIGNED NOT NULL, 637 dkey VARCHAR(255), # domain-defined 638 UNIQUE dkey (dmid, dkey), 639 640 length BIGINT UNSIGNED, # big limit 641 642 classid TINYINT UNSIGNED NOT NULL, 643 devcount TINYINT UNSIGNED NOT NULL, 644 INDEX devcount (dmid,classid,devcount) 645 )" 646} 647 648sub TABLE_tempfile { 649 "CREATE TABLE tempfile ( 650 fid INT UNSIGNED NOT NULL AUTO_INCREMENT, 651 PRIMARY KEY (fid), 652 653 createtime INT UNSIGNED NOT NULL, 654 classid TINYINT UNSIGNED NOT NULL, 655 dmid SMALLINT UNSIGNED NOT NULL, 656 dkey VARCHAR(255), 657 devids VARCHAR(60) 658 )" 659} 660 661# files marked for death when their key is overwritten. then they get a new 662# fid, but since the old row (with the old fid) had to be deleted immediately, 663# we need a place to store the fid so an async job can delete the file from 664# all devices. 665sub TABLE_file_to_delete { 666 "CREATE TABLE file_to_delete ( 667 fid INT UNSIGNED NOT NULL, 668 PRIMARY KEY (fid) 669 )" 670} 671 672# if the replicator notices that a fid has no sources, that file gets inserted 673# into the unreachable_fids table. it is up to the application to actually 674# handle fids stored in this table. 675sub TABLE_unreachable_fids { 676 "CREATE TABLE unreachable_fids ( 677 fid INT UNSIGNED NOT NULL, 678 lastupdate INT UNSIGNED NOT NULL, 679 PRIMARY KEY (fid), 680 INDEX (lastupdate) 681 )" 682} 683 684# what files are on what devices? (most likely physical devices, 685# as logical devices of RAID arrays would be costly, and mogilefs 686# already handles redundancy) 687# 688# the devid index lets us answer "What files were on this now-dead disk?" 689sub TABLE_file_on { 690 "CREATE TABLE file_on ( 691 fid INT UNSIGNED NOT NULL, 692 devid MEDIUMINT UNSIGNED NOT NULL, 693 PRIMARY KEY (fid, devid), 694 INDEX (devid) 695 )" 696} 697 698# if application or framework detects an error in one of the duplicate files 699# for whatever reason, it can register its complaint and the framework 700# will do some verifications and fix things up w/ an async job 701# MAYBE: let application tell us the SHA1/MD5 of the file for us to check 702# on the other devices? 703sub TABLE_file_on_corrupt { 704 "CREATE TABLE file_on_corrupt ( 705 fid INT UNSIGNED NOT NULL, 706 devid MEDIUMINT UNSIGNED NOT NULL, 707 PRIMARY KEY (fid, devid) 708 )" 709} 710 711# hosts (which contain devices...) 712sub TABLE_host { 713 "CREATE TABLE host ( 714 hostid MEDIUMINT UNSIGNED NOT NULL PRIMARY KEY, 715 716 status ENUM('alive','dead','down'), 717 http_port MEDIUMINT UNSIGNED DEFAULT 7500, 718 http_get_port MEDIUMINT UNSIGNED, 719 720 hostname VARCHAR(40), 721 hostip VARCHAR(15), 722 altip VARCHAR(15), 723 altmask VARCHAR(18), 724 UNIQUE (hostname), 725 UNIQUE (hostip), 726 UNIQUE (altip) 727 )" 728} 729 730# disks... 731sub TABLE_device { 732 "CREATE TABLE device ( 733 devid MEDIUMINT UNSIGNED NOT NULL, 734 hostid MEDIUMINT UNSIGNED NOT NULL, 735 736 status ENUM('alive','dead','down'), 737 weight MEDIUMINT DEFAULT 100, 738 739 mb_total INT UNSIGNED, 740 mb_used INT UNSIGNED, 741 mb_asof INT UNSIGNED, 742 PRIMARY KEY (devid), 743 INDEX (status) 744 )" 745} 746 747sub TABLE_server_settings { 748 "CREATE TABLE server_settings ( 749 field VARCHAR(50) PRIMARY KEY, 750 value TEXT 751 )" 752} 753 754sub TABLE_file_to_replicate { 755 # nexttry is time to try to replicate it next. 756 # 0 means immediate. it's only on one host. 757 # 1 means lower priority. it's on 2+ but isn't happy where it's at. 758 # unix timestamp means at/after that time. some previous error occurred. 759 # fromdevid, if not null, means which devid we should replicate from. perhaps it's the only non-corrupt one. otherwise, wherever. 760 # failcount. how many times we've failed, just for doing backoff of nexttry. 761 # flags. reserved for future use. 762 "CREATE TABLE file_to_replicate ( 763 fid INT UNSIGNED NOT NULL PRIMARY KEY, 764 nexttry INT UNSIGNED NOT NULL, 765 INDEX (nexttry), 766 fromdevid INT UNSIGNED, 767 failcount TINYINT UNSIGNED NOT NULL DEFAULT 0, 768 flags SMALLINT UNSIGNED NOT NULL DEFAULT 0 769 )" 770} 771 772sub TABLE_file_to_delete_later { 773 "CREATE TABLE file_to_delete_later ( 774 fid INT UNSIGNED NOT NULL PRIMARY KEY, 775 delafter INT UNSIGNED NOT NULL, 776 INDEX (delafter) 777 )" 778} 779 780sub TABLE_fsck_log { 781 "CREATE TABLE fsck_log ( 782 logid INT UNSIGNED NOT NULL AUTO_INCREMENT, 783 PRIMARY KEY (logid), 784 utime INT UNSIGNED NOT NULL, 785 fid INT UNSIGNED NULL, 786 evcode CHAR(4), 787 devid MEDIUMINT UNSIGNED, 788 INDEX(utime) 789 )" 790} 791 792# generic queue table, designed to be used for workers/jobs which aren't 793# constantly in use, and are async to the user. 794# ie; fsck, drain, rebalance. 795sub TABLE_file_to_queue { 796 "CREATE TABLE file_to_queue ( 797 fid INT UNSIGNED NOT NULL, 798 devid INT UNSIGNED, 799 type TINYINT UNSIGNED NOT NULL, 800 nexttry INT UNSIGNED NOT NULL, 801 failcount TINYINT UNSIGNED NOT NULL default '0', 802 flags SMALLINT UNSIGNED NOT NULL default '0', 803 arg TEXT, 804 PRIMARY KEY (fid, type), 805 INDEX type_nexttry (type,nexttry) 806 )" 807} 808 809# new style async delete table. 810# this is separate from file_to_queue since deletes are more actively used, 811# and partitioning on 'type' doesn't always work so well. 812sub TABLE_file_to_delete2 { 813 "CREATE TABLE file_to_delete2 ( 814 fid INT UNSIGNED NOT NULL PRIMARY KEY, 815 nexttry INT UNSIGNED NOT NULL, 816 failcount TINYINT UNSIGNED NOT NULL default '0', 817 INDEX nexttry (nexttry) 818 )" 819} 820 821sub TABLE_checksum { 822 "CREATE TABLE checksum ( 823 fid INT UNSIGNED NOT NULL PRIMARY KEY, 824 hashtype TINYINT UNSIGNED NOT NULL, 825 checksum VARBINARY(64) NOT NULL 826 )" 827} 828 829# these five only necessary for MySQL, since no other database existed 830# before, so they can just create the tables correctly to begin with. 831# in the future, there might be new alters that non-MySQL databases 832# will have to implement. 833sub upgrade_add_host_getport { 1 } 834sub upgrade_add_host_altip { 1 } 835sub upgrade_add_device_asof { 1 } 836sub upgrade_add_device_weight { 1 } 837sub upgrade_add_device_readonly { 1 } 838sub upgrade_add_device_drain { die "Not implemented in $_[0]" } 839sub upgrade_modify_server_settings_value { die "Not implemented in $_[0]" } 840sub upgrade_add_file_to_queue_arg { die "Not implemented in $_[0]" } 841sub upgrade_modify_device_size { die "Not implemented in $_[0]" } 842 843sub upgrade_add_class_replpolicy { 844 my ($self) = @_; 845 unless ($self->column_type("class", "replpolicy")) { 846 $self->dowell("ALTER TABLE class ADD COLUMN replpolicy VARCHAR(255)"); 847 } 848} 849 850sub upgrade_add_class_hashtype { 851 my ($self) = @_; 852 unless ($self->column_type("class", "hashtype")) { 853 $self->dowell("ALTER TABLE class ADD COLUMN hashtype TINYINT UNSIGNED"); 854 } 855} 856 857# return true if deleted, 0 if didn't exist, exception if error 858sub delete_host { 859 my ($self, $hostid) = @_; 860 return $self->dbh->do("DELETE FROM host WHERE hostid = ?", undef, $hostid); 861} 862 863# return true if deleted, 0 if didn't exist, exception if error 864sub delete_domain { 865 my ($self, $dmid) = @_; 866 my ($err, $rv); 867 my $dbh = $self->dbh; 868 eval { 869 $dbh->begin_work; 870 if ($self->domain_has_files($dmid)) { 871 $err = "has_files"; 872 } elsif ($self->domain_has_classes($dmid)) { 873 $err = "has_classes"; 874 } else { 875 $rv = $dbh->do("DELETE FROM domain WHERE dmid = ?", undef, $dmid); 876 877 # remove the "default" class if one was created (for mindevcount) 878 # this is currently the only way to delete the "default" class 879 $dbh->do("DELETE FROM class WHERE dmid = ? AND classid = 0", undef, $dmid); 880 $dbh->commit; 881 } 882 $dbh->rollback if $err; 883 }; 884 $self->condthrow; # will rollback on errors 885 throw($err) if $err; 886 return $rv; 887} 888 889sub domain_has_files { 890 my ($self, $dmid) = @_; 891 my $has_a_fid = $self->dbh->selectrow_array('SELECT fid FROM file WHERE dmid = ? LIMIT 1', 892 undef, $dmid); 893 return $has_a_fid ? 1 : 0; 894} 895 896sub domain_has_classes { 897 my ($self, $dmid) = @_; 898 # queryworker does not permit removing default class, so domain_has_classes 899 # should not register the default class 900 my $has_a_class = $self->dbh->selectrow_array('SELECT classid FROM class WHERE dmid = ? AND classid != 0 LIMIT 1', 901 undef, $dmid); 902 return defined($has_a_class); 903} 904 905sub class_has_files { 906 my ($self, $dmid, $clid) = @_; 907 my $has_a_fid = $self->dbh->selectrow_array('SELECT fid FROM file WHERE dmid = ? AND classid = ? LIMIT 1', 908 undef, $dmid, $clid); 909 return $has_a_fid ? 1 : 0; 910} 911 912# return new classid on success (non-zero integer), die on failure 913# throw 'dup' on duplicate name 914sub create_class { 915 my ($self, $dmid, $classname) = @_; 916 my $dbh = $self->dbh; 917 918 my ($clsid, $rv); 919 920 eval { 921 $dbh->begin_work; 922 if ($classname eq 'default') { 923 $clsid = 0; 924 } else { 925 # get the max class id in this domain 926 my $maxid = $dbh->selectrow_array 927 ('SELECT MAX(classid) FROM class WHERE dmid = ?', undef, $dmid) || 0; 928 $clsid = $maxid + 1; 929 } 930 # now insert the new class 931 $rv = $dbh->do("INSERT INTO class (dmid, classid, classname, mindevcount) VALUES (?, ?, ?, ?)", 932 undef, $dmid, $clsid, $classname, 2); 933 $dbh->commit if $rv; 934 }; 935 if ($@ || $dbh->err) { 936 if ($self->was_duplicate_error) { 937 # ensure we're not inside a transaction 938 if ($dbh->{AutoCommit} == 0) { eval { $dbh->rollback }; } 939 throw("dup"); 940 } 941 } 942 $self->condthrow; # this will rollback on errors 943 return $clsid if $rv; 944 die; 945} 946 947# return 1 on success, throw "dup" on duplicate name error, die otherwise 948sub update_class_name { 949 my $self = shift; 950 my %arg = $self->_valid_params([qw(dmid classid classname)], @_); 951 my $rv = eval { 952 $self->dbh->do("UPDATE class SET classname=? WHERE dmid=? AND classid=?", 953 undef, $arg{classname}, $arg{dmid}, $arg{classid}); 954 }; 955 throw("dup") if $self->was_duplicate_error; 956 $self->condthrow; 957 return 1; 958} 959 960# return 1 on success, die otherwise 961sub update_class_mindevcount { 962 my $self = shift; 963 my %arg = $self->_valid_params([qw(dmid classid mindevcount)], @_); 964 eval { 965 $self->dbh->do("UPDATE class SET mindevcount=? WHERE dmid=? AND classid=?", 966 undef, $arg{mindevcount}, $arg{dmid}, $arg{classid}); 967 }; 968 $self->condthrow; 969 return 1; 970} 971 972# return 1 on success, die otherwise 973sub update_class_replpolicy { 974 my $self = shift; 975 my %arg = $self->_valid_params([qw(dmid classid replpolicy)], @_); 976 eval { 977 $self->dbh->do("UPDATE class SET replpolicy=? WHERE dmid=? AND classid=?", 978 undef, $arg{replpolicy}, $arg{dmid}, $arg{classid}); 979 }; 980 $self->condthrow; 981 return 1; 982} 983 984# return 1 on success, die otherwise 985sub update_class_hashtype { 986 my $self = shift; 987 my %arg = $self->_valid_params([qw(dmid classid hashtype)], @_); 988 eval { 989 $self->dbh->do("UPDATE class SET hashtype=? WHERE dmid=? AND classid=?", 990 undef, $arg{hashtype}, $arg{dmid}, $arg{classid}); 991 }; 992 $self->condthrow; 993} 994 995sub nfiles_with_dmid_classid_devcount { 996 my ($self, $dmid, $classid, $devcount) = @_; 997 return $self->dbh->selectrow_array('SELECT COUNT(*) FROM file WHERE dmid = ? AND classid = ? AND devcount = ?', 998 undef, $dmid, $classid, $devcount); 999} 1000 1001sub set_server_setting { 1002 my ($self, $key, $val) = @_; 1003 my $dbh = $self->dbh; 1004 die "Your database does not support REPLACE! Reimplement set_server_setting!" unless $self->can_replace; 1005 1006 eval { 1007 if (defined $val) { 1008 $dbh->do("REPLACE INTO server_settings (field, value) VALUES (?, ?)", undef, $key, $val); 1009 } else { 1010 $dbh->do("DELETE FROM server_settings WHERE field=?", undef, $key); 1011 } 1012 }; 1013 1014 die "Error updating 'server_settings': " . $dbh->errstr if $dbh->err; 1015 return 1; 1016} 1017 1018# FIXME: racy. currently the only caller doesn't matter, but should be fixed. 1019sub incr_server_setting { 1020 my ($self, $key, $val) = @_; 1021 $val = 1 unless defined $val; 1022 return unless $val; 1023 1024 return 1 if $self->dbh->do("UPDATE server_settings ". 1025 "SET value=value+? ". 1026 "WHERE field=?", undef, 1027 $val, $key) > 0; 1028 $self->set_server_setting($key, $val); 1029} 1030 1031sub server_setting { 1032 my ($self, $key) = @_; 1033 return $self->dbh->selectrow_array("SELECT value FROM server_settings WHERE field=?", 1034 undef, $key); 1035} 1036 1037sub server_settings { 1038 my ($self) = @_; 1039 my $ret = {}; 1040 my $sth = $self->dbh->prepare("SELECT field, value FROM server_settings"); 1041 $sth->execute; 1042 while (my ($k, $v) = $sth->fetchrow_array) { 1043 $ret->{$k} = $v; 1044 } 1045 return $ret; 1046} 1047 1048# register a tempfile and return the fidid, which should be allocated 1049# using autoincrement/sequences if the passed in fid is undef. however, 1050# if fid is passed in, that value should be used and returned. 1051# 1052# return new/passed in fidid on success. 1053# throw 'dup' if fid already in use 1054# return 0/undef/die on failure 1055# 1056sub register_tempfile { 1057 my $self = shift; 1058 my %arg = $self->_valid_params([qw(fid dmid key classid devids)], @_); 1059 1060 my $dbh = $self->dbh; 1061 my $fid = $arg{fid}; 1062 1063 my $explicit_fid_used = $fid ? 1 : 0; 1064 1065 # setup the new mapping. we store the devices that we picked for 1066 # this file in here, knowing that they might not be used. create_close 1067 # is responsible for actually mapping in file_on. NOTE: fid is being 1068 # passed in, it's either some number they gave us, or it's going to be 1069 # 0/undef which translates into NULL which means to automatically create 1070 # one. that should be fine. 1071 my $ins_tempfile = sub { 1072 my $rv = eval { 1073 # We must only pass the correct number of bind parameters 1074 # Using 'NULL' for the AUTO_INCREMENT/SERIAL column will fail on 1075 # Postgres, where you are expected to leave it out or use DEFAULT 1076 # Leaving it out seems sanest and least likely to cause problems 1077 # with other databases. 1078 my @keys = ('dmid', 'dkey', 'classid', 'devids', 'createtime'); 1079 my @vars = ('?' , '?' , '?' , '?' , $self->unix_timestamp); 1080 my @vals = ($arg{dmid}, $arg{key}, $arg{classid} || 0, $arg{devids}); 1081 # Do not check for $explicit_fid_used, but rather $fid directly 1082 # as this anonymous sub is called from the loop later 1083 if($fid) { 1084 unshift @keys, 'fid'; 1085 unshift @vars, '?'; 1086 unshift @vals, $fid; 1087 } 1088 my $sql = "INSERT INTO tempfile (".join(',',@keys).") VALUES (".join(',',@vars).")"; 1089 $dbh->do($sql, undef, @vals); 1090 }; 1091 if (!$rv) { 1092 return undef if $self->was_duplicate_error; 1093 die "Unexpected db error into tempfile: " . $dbh->errstr; 1094 } 1095 1096 unless (defined $fid) { 1097 # if they did not give us a fid, then we want to grab the one that was 1098 # theoretically automatically generated 1099 $fid = $dbh->last_insert_id(undef, undef, 'tempfile', 'fid') 1100 or die "No last_insert_id found"; 1101 } 1102 return undef unless defined $fid && $fid > 0; 1103 return 1; 1104 }; 1105 1106 unless ($ins_tempfile->()) { 1107 throw("dup") if $explicit_fid_used; 1108 die "tempfile insert failed"; 1109 } 1110 1111 my $fid_in_use = sub { 1112 my $exists = $dbh->selectrow_array("SELECT COUNT(*) FROM file WHERE fid=?", undef, $fid); 1113 return $exists ? 1 : 0; 1114 }; 1115 1116 # See notes in MogileFS::Config->check_database 1117 my $min_fidid = MogileFS::Config->config('min_fidid'); 1118 1119 # if the fid is in use, do something 1120 while ($fid_in_use->($fid) || $fid <= $min_fidid) { 1121 throw("dup") if $explicit_fid_used; 1122 1123 # be careful of databases which reset their 1124 # auto-increment/sequences when the table is empty (InnoDB 1125 # did/does this, for instance). So check if it's in use, and 1126 # re-seed the table with the highest known fid from the file 1127 # table. 1128 1129 # get the highest fid from the filetable and insert a dummy row 1130 $fid = $dbh->selectrow_array("SELECT MAX(fid) FROM file"); 1131 $ins_tempfile->(); # don't care about its result 1132 1133 # then do a normal auto-increment 1134 $fid = undef; 1135 $ins_tempfile->() or die "register_tempfile failed after seeding"; 1136 } 1137 1138 return $fid; 1139} 1140 1141# return hashref of row containing columns "fid, dmid, dkey, length, 1142# classid, devcount" provided a $dmid and $key (dkey). or undef if no 1143# row. 1144sub file_row_from_dmid_key { 1145 my ($self, $dmid, $key) = @_; 1146 return $self->dbh->selectrow_hashref("SELECT fid, dmid, dkey, length, classid, devcount ". 1147 "FROM file WHERE dmid=? AND dkey=?", 1148 undef, $dmid, $key); 1149} 1150 1151# return hashref of row containing columns "fid, dmid, dkey, length, 1152# classid, devcount" provided a $fidid or undef if no row. 1153sub file_row_from_fidid { 1154 my ($self, $fidid) = @_; 1155 return $self->dbh->selectrow_hashref("SELECT fid, dmid, dkey, length, classid, devcount ". 1156 "FROM file WHERE fid=?", 1157 undef, $fidid); 1158} 1159 1160# return an arrayref of rows containing columns "fid, dmid, dkey, length, 1161# classid, devcount" provided a pair of $fidid or undef if no rows. 1162sub file_row_from_fidid_range { 1163 my ($self, $fromfid, $count) = @_; 1164 my $sth = $self->dbh->prepare("SELECT fid, dmid, dkey, length, classid, devcount ". 1165 "FROM file WHERE fid > ? LIMIT ?"); 1166 $sth->execute($fromfid,$count); 1167 return $sth->fetchall_arrayref({}); 1168} 1169 1170# return array of devids that a fidid is on 1171sub fid_devids { 1172 my ($self, $fidid) = @_; 1173 return @{ $self->dbh->selectcol_arrayref("SELECT devid FROM file_on WHERE fid=?", 1174 undef, $fidid) || [] }; 1175} 1176 1177# return hashref of { $fidid => [ $devid, $devid... ] } for a bunch of given @fidids 1178sub fid_devids_multiple { 1179 my ($self, @fidids) = @_; 1180 my $in = join(",", map { $_+0 } @fidids); 1181 my $ret = {}; 1182 my $sth = $self->dbh->prepare("SELECT fid, devid FROM file_on WHERE fid IN ($in)"); 1183 $sth->execute; 1184 while (my ($fidid, $devid) = $sth->fetchrow_array) { 1185 push @{$ret->{$fidid} ||= []}, $devid; 1186 } 1187 return $ret; 1188} 1189 1190# return hashref of columns classid, dmid, dkey, given a $fidid, or return undef 1191sub tempfile_row_from_fid { 1192 my ($self, $fidid) = @_; 1193 return $self->dbh->selectrow_hashref("SELECT classid, dmid, dkey, devids ". 1194 "FROM tempfile WHERE fid=?", 1195 undef, $fidid); 1196} 1197 1198# return 1 on success, throw "dup" on duplicate devid or throws other error on failure 1199sub create_device { 1200 my ($self, $devid, $hostid, $status) = @_; 1201 my $rv = $self->conddup(sub { 1202 $self->dbh->do("INSERT INTO device (devid, hostid, status) VALUES (?,?,?)", undef, 1203 $devid, $hostid, $status); 1204 }); 1205 $self->condthrow; 1206 die "error making device $devid\n" unless $rv > 0; 1207 return 1; 1208} 1209 1210sub update_device { 1211 my ($self, $devid, $to_update) = @_; 1212 my @keys = sort keys %$to_update; 1213 return unless @keys; 1214 $self->conddup(sub { 1215 $self->dbh->do("UPDATE device SET " . join('=?, ', @keys) 1216 . "=? WHERE devid=?", undef, (map { $to_update->{$_} } @keys), 1217 $devid); 1218 }); 1219 return 1; 1220} 1221 1222sub update_device_usage { 1223 my $self = shift; 1224 my %arg = $self->_valid_params([qw(mb_total mb_used devid mb_asof)], @_); 1225 eval { 1226 $self->dbh->do("UPDATE device SET ". 1227 "mb_total = ?, mb_used = ?, mb_asof = ?" . 1228 " WHERE devid = ?", 1229 undef, $arg{mb_total}, $arg{mb_used}, $arg{mb_asof}, 1230 $arg{devid}); 1231 }; 1232 $self->condthrow; 1233} 1234 1235# MySQL has an optimized version 1236sub update_device_usages { 1237 my ($self, $updates, $cb) = @_; 1238 foreach my $upd (@$updates) { 1239 $self->update_device_usage(%$upd); 1240 $cb->(); 1241 } 1242} 1243 1244# This is unimplemented at the moment as we must verify: 1245# - no file_on rows exist 1246# - nothing in file_to_queue is going to attempt to use it 1247# - nothing in file_to_replicate is going to attempt to use it 1248# - it's already been marked dead 1249# - that all trackers are likely to know this :/ 1250# - ensure the devid can't be reused 1251# IE; the user can't mark it dead then remove it all at once and cause their 1252# cluster to implode. 1253sub delete_device { 1254 die "Unimplemented; needs further testing"; 1255} 1256 1257sub set_device_weight { 1258 my ($self, $devid, $weight) = @_; 1259 eval { 1260 $self->dbh->do('UPDATE device SET weight = ? WHERE devid = ?', undef, $weight, $devid); 1261 }; 1262 $self->condthrow; 1263} 1264 1265sub set_device_state { 1266 my ($self, $devid, $state) = @_; 1267 eval { 1268 $self->dbh->do('UPDATE device SET status = ? WHERE devid = ?', undef, $state, $devid); 1269 }; 1270 $self->condthrow; 1271} 1272 1273sub delete_class { 1274 my ($self, $dmid, $cid) = @_; 1275 throw("has_files") if $self->class_has_files($dmid, $cid); 1276 eval { 1277 $self->dbh->do("DELETE FROM class WHERE dmid = ? AND classid = ?", undef, $dmid, $cid); 1278 }; 1279 $self->condthrow; 1280} 1281 1282# called from a queryworker process, will trigger delete_fidid_enqueued 1283# in the delete worker 1284sub delete_fidid { 1285 my ($self, $fidid) = @_; 1286 eval { $self->dbh->do("DELETE FROM file WHERE fid=?", undef, $fidid); }; 1287 $self->condthrow; 1288 $self->enqueue_for_delete2($fidid, 0); 1289 $self->condthrow; 1290} 1291 1292# Only called from delete workers (after delete_fidid), 1293# this reduces client-visible latency from the queryworker 1294sub delete_fidid_enqueued { 1295 my ($self, $fidid) = @_; 1296 eval { $self->delete_checksum($fidid); }; 1297 $self->condthrow; 1298 eval { $self->dbh->do("DELETE FROM tempfile WHERE fid=?", undef, $fidid); }; 1299 $self->condthrow; 1300} 1301 1302sub delete_tempfile_row { 1303 my ($self, $fidid) = @_; 1304 my $rv = eval { $self->dbh->do("DELETE FROM tempfile WHERE fid=?", undef, $fidid); }; 1305 $self->condthrow; 1306 return $rv; 1307} 1308 1309# Load the specified tempfile, then delete it. If we succeed, we were 1310# here first; otherwise, someone else beat us here (and we return undef) 1311sub delete_and_return_tempfile_row { 1312 my ($self, $fidid) = @_; 1313 my $rv = $self->tempfile_row_from_fid($fidid); 1314 my $rows_deleted = $self->delete_tempfile_row($fidid); 1315 return $rv if ($rows_deleted > 0); 1316} 1317 1318sub replace_into_file { 1319 my $self = shift; 1320 my %arg = $self->_valid_params([qw(fidid dmid key length classid devcount)], @_); 1321 die "Your database does not support REPLACE! Reimplement replace_into_file!" unless $self->can_replace; 1322 eval { 1323 $self->dbh->do("REPLACE INTO file (fid, dmid, dkey, length, classid, devcount) ". 1324 "VALUES (?,?,?,?,?,?) ", undef, 1325 @arg{'fidid', 'dmid', 'key', 'length', 'classid', 'devcount'}); 1326 }; 1327 $self->condthrow; 1328} 1329 1330# returns 1 on success, 0 on duplicate key error, dies on exception 1331# TODO: need a test to hit the duplicate name error condition 1332# TODO: switch to using "dup" exception here? 1333sub rename_file { 1334 my ($self, $fidid, $to_key) = @_; 1335 my $dbh = $self->dbh; 1336 eval { 1337 $dbh->do('UPDATE file SET dkey = ? WHERE fid=?', 1338 undef, $to_key, $fidid); 1339 }; 1340 if ($@ || $dbh->err) { 1341 # first is MySQL's error code for duplicates 1342 if ($self->was_duplicate_error) { 1343 return 0; 1344 } else { 1345 die $@; 1346 } 1347 } 1348 $self->condthrow; 1349 return 1; 1350} 1351 1352sub get_domainid_by_name { 1353 my $self = shift; 1354 my ($dmid) = $self->dbh->selectrow_array('SELECT dmid FROM domain WHERE namespace = ?', 1355 undef, $_[0]); 1356 return $dmid; 1357} 1358 1359# returns a hash of domains. Key is namespace, value is dmid. 1360sub get_all_domains { 1361 my ($self) = @_; 1362 my $domains = $self->dbh->selectall_arrayref('SELECT namespace, dmid FROM domain'); 1363 return map { ($_->[0], $_->[1]) } @{$domains || []}; 1364} 1365 1366sub get_classid_by_name { 1367 my $self = shift; 1368 my ($classid) = $self->dbh->selectrow_array('SELECT classid FROM class WHERE dmid = ? AND classname = ?', 1369 undef, $_[0], $_[1]); 1370 return $classid; 1371} 1372 1373# returns an array of hashrefs, one hashref per row in the 'class' table 1374sub get_all_classes { 1375 my ($self) = @_; 1376 my (@ret, $row); 1377 1378 my @cols = qw/dmid classid classname mindevcount/; 1379 if ($self->cached_schema_version >= 10) { 1380 push @cols, 'replpolicy'; 1381 if ($self->cached_schema_version >= 15) { 1382 push @cols, 'hashtype'; 1383 } 1384 } 1385 my $cols = join(', ', @cols); 1386 my $sth = $self->dbh->prepare("SELECT $cols FROM class"); 1387 $sth->execute; 1388 push @ret, $row while $row = $sth->fetchrow_hashref; 1389 return @ret; 1390} 1391 1392# add a record of fidid existing on devid 1393# returns 1 on success, 0 on duplicate 1394sub add_fidid_to_devid { 1395 my ($self, $fidid, $devid) = @_; 1396 croak("fidid not non-zero") unless $fidid; 1397 croak("devid not non-zero") unless $devid; 1398 1399 # TODO: This should possibly be insert_ignore instead 1400 # As if we are adding an extra file_on entry, we do not want to replace the 1401 # exist one. Check REPLACE semantics. 1402 my $rv = $self->dowell($self->ignore_replace . " INTO file_on (fid, devid) VALUES (?,?)", 1403 undef, $fidid, $devid); 1404 return 1 if $rv > 0; 1405 return 0; 1406} 1407 1408# remove a record of fidid existing on devid 1409# returns 1 on success, 0 if not there anyway 1410sub remove_fidid_from_devid { 1411 my ($self, $fidid, $devid) = @_; 1412 my $rv = eval { $self->dbh->do("DELETE FROM file_on WHERE fid=? AND devid=?", 1413 undef, $fidid, $devid); }; 1414 $self->condthrow; 1415 return $rv; 1416} 1417 1418# Test if host exists. 1419sub get_hostid_by_id { 1420 my $self = shift; 1421 my ($hostid) = $self->dbh->selectrow_array('SELECT hostid FROM host WHERE hostid = ?', 1422 undef, $_[0]); 1423 return $hostid; 1424} 1425 1426sub get_hostid_by_name { 1427 my $self = shift; 1428 my ($hostid) = $self->dbh->selectrow_array('SELECT hostid FROM host WHERE hostname = ?', 1429 undef, $_[0]); 1430 return $hostid; 1431} 1432 1433# get all hosts from database, returns them as list of hashrefs, hashrefs being the row contents. 1434sub get_all_hosts { 1435 my ($self) = @_; 1436 my $sth = $self->dbh->prepare("SELECT /*!40000 SQL_CACHE */ hostid, status, hostname, " . 1437 "hostip, http_port, http_get_port, altip, altmask FROM host"); 1438 $sth->execute; 1439 my @ret; 1440 while (my $row = $sth->fetchrow_hashref) { 1441 push @ret, $row; 1442 } 1443 return @ret; 1444} 1445 1446# get all devices from database, returns them as list of hashrefs, hashrefs being the row contents. 1447sub get_all_devices { 1448 my ($self) = @_; 1449 my $sth = $self->dbh->prepare("SELECT /*!40000 SQL_CACHE */ devid, hostid, mb_total, " . 1450 "mb_used, mb_asof, status, weight FROM device"); 1451 $self->condthrow; 1452 $sth->execute; 1453 my @return; 1454 while (my $row = $sth->fetchrow_hashref) { 1455 push @return, $row; 1456 } 1457 return @return; 1458} 1459 1460# update the device count for a given fidid 1461sub update_devcount { 1462 my ($self, $fidid) = @_; 1463 my $dbh = $self->dbh; 1464 my $ct = $dbh->selectrow_array("SELECT COUNT(*) FROM file_on WHERE fid=?", 1465 undef, $fidid); 1466 1467 eval { $dbh->do("UPDATE file SET devcount=? WHERE fid=?", undef, 1468 $ct, $fidid); }; 1469 $self->condthrow; 1470 1471 return 1; 1472} 1473 1474# update the classid for a given fidid 1475sub update_classid { 1476 my ($self, $fidid, $classid) = @_; 1477 my $dbh = $self->dbh; 1478 1479 $dbh->do("UPDATE file SET classid=? WHERE fid=?", undef, 1480 $classid, $fidid); 1481 1482 $self->condthrow; 1483 return 1; 1484} 1485 1486# enqueue a fidid for replication, from a specific deviceid (can be undef), in a given number of seconds. 1487sub enqueue_for_replication { 1488 my ($self, $fidid, $from_devid, $in) = @_; 1489 1490 my $nexttry = 0; 1491 if ($in) { 1492 $nexttry = $self->unix_timestamp . " + " . int($in); 1493 } 1494 1495 $self->retry_on_deadlock(sub { 1496 $self->insert_ignore("INTO file_to_replicate (fid, fromdevid, nexttry) ". 1497 "VALUES (?,?,$nexttry)", undef, $fidid, $from_devid); 1498 }); 1499} 1500 1501# enqueue a fidid for delete 1502# note: if we get one more "independent" queue like this, the 1503# code should be collapsable? I tried once and it looked too ugly, so we have 1504# some redundancy. 1505sub enqueue_for_delete2 { 1506 my ($self, $fidid, $in) = @_; 1507 1508 $in = 0 unless $in; 1509 my $nexttry = $self->unix_timestamp . " + " . int($in); 1510 1511 $self->retry_on_deadlock(sub { 1512 $self->insert_ignore("INTO file_to_delete2 (fid, nexttry) ". 1513 "VALUES (?,$nexttry)", undef, $fidid); 1514 }); 1515} 1516 1517# enqueue a fidid for work 1518sub enqueue_for_todo { 1519 my ($self, $fidid, $type, $in) = @_; 1520 1521 $in = 0 unless $in; 1522 my $nexttry = $self->unix_timestamp . " + " . int($in); 1523 1524 $self->retry_on_deadlock(sub { 1525 if (ref($fidid)) { 1526 $self->insert_ignore("INTO file_to_queue (fid, devid, arg, type, ". 1527 "nexttry) VALUES (?,?,?,?,$nexttry)", undef, 1528 $fidid->[0], $fidid->[1], $fidid->[2], $type); 1529 } else { 1530 $self->insert_ignore("INTO file_to_queue (fid, type, nexttry) ". 1531 "VALUES (?,?,$nexttry)", undef, $fidid, $type); 1532 } 1533 }); 1534} 1535 1536# return 1 on success. die otherwise. 1537sub enqueue_many_for_todo { 1538 my ($self, $fidids, $type, $in) = @_; 1539 if (! ($self->can_insert_multi && ($self->can_replace || $self->can_insertignore))) { 1540 $self->enqueue_for_todo($_, $type, $in) foreach @$fidids; 1541 return 1; 1542 } 1543 1544 $in = 0 unless $in; 1545 my $nexttry = $self->unix_timestamp . " + " . int($in); 1546 1547 # TODO: convert to prepared statement? 1548 $self->retry_on_deadlock(sub { 1549 if (ref($fidids->[0]) eq 'ARRAY') { 1550 my $sql = $self->ignore_replace . 1551 "INTO file_to_queue (fid, devid, arg, type, nexttry) VALUES ". 1552 join(', ', ('(?,?,?,?,?)') x scalar @$fidids); 1553 $self->dbh->do($sql, undef, map { @$_, $type, $nexttry } @$fidids); 1554 } else { 1555 $self->dbh->do($self->ignore_replace . " INTO file_to_queue (fid, type, 1556 nexttry) VALUES " . 1557 join(",", map { "(" . int($_) . ", $type, $nexttry)" } @$fidids)); 1558 } 1559 }); 1560 $self->condthrow; 1561} 1562 1563# For file_to_queue queues that should be kept small, find the size. 1564# This isn't fast, but for small queues won't be slow, and is usually only ran 1565# from a single tracker. 1566sub file_queue_length { 1567 my $self = shift; 1568 my $type = shift; 1569 1570 return $self->dbh->selectrow_array("SELECT COUNT(*) FROM file_to_queue " . 1571 "WHERE type = ?", undef, $type); 1572} 1573 1574# reschedule all deferred replication, return number rescheduled 1575sub replicate_now { 1576 my ($self) = @_; 1577 1578 $self->retry_on_deadlock(sub { 1579 return $self->dbh->do("UPDATE file_to_replicate SET nexttry = " . $self->unix_timestamp . 1580 " WHERE nexttry > " . $self->unix_timestamp); 1581 }); 1582} 1583 1584# takes two arguments, devid and limit, both required. returns an arrayref of fidids. 1585sub get_fidids_by_device { 1586 my ($self, $devid, $limit) = @_; 1587 1588 my $dbh = $self->dbh; 1589 my $fidids = $dbh->selectcol_arrayref("SELECT fid FROM file_on WHERE devid = ? LIMIT $limit", 1590 undef, $devid); 1591 return $fidids; 1592} 1593 1594# finds a chunk of fids given a set of constraints: 1595# devid, fidid, age (new or old), limit 1596# Note that if this function is very slow on your large DB, you're likely 1597# sorting by "newfiles" and are missing a new index. 1598# returns an arrayref of fidids 1599sub get_fidid_chunks_by_device { 1600 my ($self, %o) = @_; 1601 1602 my $dbh = $self->dbh; 1603 my $devid = delete $o{devid}; 1604 croak("must supply at least a devid") unless $devid; 1605 my $age = delete $o{age}; 1606 my $fidid = delete $o{fidid}; 1607 my $limit = delete $o{limit}; 1608 croak("invalid options: " . join(', ', keys %o)) if %o; 1609 # If supplied a "previous" fidid, we're paging through. 1610 my $fidsort = ''; 1611 my $order = ''; 1612 $age ||= 'old'; 1613 if ($age eq 'old') { 1614 $fidsort = 'AND fid > ?' if $fidid; 1615 $order = 'ASC'; 1616 } elsif ($age eq 'new') { 1617 $fidsort = 'AND fid < ?' if $fidid; 1618 $order = 'DESC'; 1619 } else { 1620 croak("invalid age argument: " . $age); 1621 } 1622 $limit ||= 100; 1623 my @extra = (); 1624 push @extra, $fidid if $fidid; 1625 1626 my $fidids = $dbh->selectcol_arrayref("SELECT fid FROM file_on WHERE devid = ? " . 1627 $fidsort . " ORDER BY fid $order LIMIT $limit", undef, $devid, @extra); 1628 return $fidids; 1629} 1630 1631# gets fidids above fidid_low up to (and including) fidid_high 1632sub get_fidids_between { 1633 my ($self, $fidid_low, $fidid_high, $limit) = @_; 1634 $limit ||= 1000; 1635 $limit = int($limit); 1636 1637 my $dbh = $self->dbh; 1638 my $fidids = $dbh->selectcol_arrayref(qq{SELECT fid FROM file 1639 WHERE fid > ? and fid <= ? 1640 ORDER BY fid LIMIT $limit}, undef, $fidid_low, $fidid_high); 1641 return $fidids; 1642} 1643 1644# creates a new domain, given a domain namespace string. return the dmid on success, 1645# throw 'dup' on duplicate name. 1646# override if you want a less racy version. 1647sub create_domain { 1648 my ($self, $name) = @_; 1649 my $dbh = $self->dbh; 1650 1651 # get the max domain id 1652 my $maxid = $dbh->selectrow_array('SELECT MAX(dmid) FROM domain') || 0; 1653 my $rv = eval { 1654 $dbh->do('INSERT INTO domain (dmid, namespace) VALUES (?, ?)', 1655 undef, $maxid + 1, $name); 1656 }; 1657 if ($self->was_duplicate_error) { 1658 throw("dup"); 1659 } 1660 return $maxid+1 if $rv; 1661 die "failed to make domain"; # FIXME: the above is racy. 1662} 1663 1664sub update_host { 1665 my ($self, $hid, $to_update) = @_; 1666 my @keys = sort keys %$to_update; 1667 return unless @keys; 1668 $self->conddup(sub { 1669 $self->dbh->do("UPDATE host SET " . join('=?, ', @keys) 1670 . "=? WHERE hostid=?", undef, (map { $to_update->{$_} } @keys), 1671 $hid); 1672 }); 1673 return 1; 1674} 1675 1676# return ne hostid, or throw 'dup' on error. 1677# NOTE: you need to put them into the initial 'down' state. 1678sub create_host { 1679 my ($self, $hostname, $ip) = @_; 1680 my $dbh = $self->dbh; 1681 # racy! lazy. no, better: portable! how often does this happen? :) 1682 my $hid = ($dbh->selectrow_array('SELECT MAX(hostid) FROM host') || 0) + 1; 1683 my $rv = $self->conddup(sub { 1684 $dbh->do("INSERT INTO host (hostid, hostname, hostip, status) ". 1685 "VALUES (?, ?, ?, 'down')", 1686 undef, $hid, $hostname, $ip); 1687 }); 1688 return $hid if $rv; 1689 die "db failure"; 1690} 1691 1692# return array of row hashrefs containing columns: (fid, fromdevid, 1693# failcount, flags, nexttry) 1694sub files_to_replicate { 1695 my ($self, $limit) = @_; 1696 my $ut = $self->unix_timestamp; 1697 my $to_repl_map = $self->dbh->selectall_hashref(qq{ 1698 SELECT fid, fromdevid, failcount, flags, nexttry 1699 FROM file_to_replicate 1700 WHERE nexttry <= $ut 1701 ORDER BY nexttry 1702 LIMIT $limit 1703 }, "fid") or return (); 1704 return values %$to_repl_map; 1705} 1706 1707# "new" style queue consumption code. 1708# from within a transaction, fetch a limit of fids, 1709# then update each fid's nexttry to be off in the future, 1710# giving local workers some time to dequeue the items. 1711# Note: 1712# DBI (even with RaiseError) returns weird errors on 1713# deadlocks from selectall_hashref. So we can't do that. 1714# we also used to retry on deadlock within the routine, 1715# but instead lets return undef and let job_master retry. 1716sub grab_queue_chunk { 1717 my $self = shift; 1718 my $queue = shift; 1719 my $limit = shift; 1720 my $extfields = shift; 1721 1722 my $dbh = $self->dbh; 1723 my $tries = 3; 1724 my $work; 1725 1726 return 0 unless $self->lock_queue($queue); 1727 1728 my $extwhere = shift || ''; 1729 my $fields = 'fid, nexttry, failcount'; 1730 $fields .= ', ' . $extfields if $extfields; 1731 eval { 1732 $dbh->begin_work; 1733 my $ut = $self->unix_timestamp; 1734 my $query = qq{ 1735 SELECT $fields 1736 FROM $queue 1737 WHERE nexttry <= $ut 1738 $extwhere 1739 ORDER BY nexttry 1740 LIMIT $limit 1741 }; 1742 $query .= "FOR UPDATE\n" if $self->can_for_update; 1743 my $sth = $dbh->prepare($query); 1744 $sth->execute; 1745 $work = $sth->fetchall_hashref('fid'); 1746 # Nothing to work on. 1747 # Now claim the fids for a while. 1748 # TODO: Should be configurable... but not necessary. 1749 my $fidlist = join(',', keys %$work); 1750 unless ($fidlist) { $dbh->commit; return; } 1751 $dbh->do("UPDATE $queue SET nexttry = $ut + 1000 WHERE fid IN ($fidlist)"); 1752 $dbh->commit; 1753 }; 1754 if ($self->was_deadlock_error) { 1755 eval { $dbh->rollback }; 1756 $work = undef; 1757 } else { 1758 $self->condthrow; 1759 } 1760 # FIXME: Super extra paranoia to prevent deadlocking. 1761 # Need to handle or die on all errors above, but $@ can get reset. For now 1762 # we'll just always ensure there's no transaction running at the end here. 1763 # A (near) release should figure the error detection correctly. 1764 if ($dbh->{AutoCommit} == 0) { eval { $dbh->rollback }; } 1765 $self->unlock_queue($queue); 1766 1767 return defined $work ? values %$work : (); 1768} 1769 1770sub grab_files_to_replicate { 1771 my ($self, $limit) = @_; 1772 return $self->grab_queue_chunk('file_to_replicate', $limit, 1773 'fromdevid, flags'); 1774} 1775 1776sub grab_files_to_delete2 { 1777 my ($self, $limit) = @_; 1778 return $self->grab_queue_chunk('file_to_delete2', $limit); 1779} 1780 1781# $extwhere is ugly... but should be fine. 1782sub grab_files_to_queued { 1783 my ($self, $type, $what, $limit) = @_; 1784 $what ||= 'type, flags'; 1785 return $self->grab_queue_chunk('file_to_queue', $limit, 1786 $what, 'AND type = ' . $type); 1787} 1788 1789# although it's safe to have multiple tracker hosts and/or processes 1790# replicating the same file, around, it's inefficient CPU/time-wise, 1791# and it's also possible they pick different places and waste disk. 1792# so the replicator asks the store interface when it's about to start 1793# and when it's done replicating a fidid, so you can do something smart 1794# and tell it not to. 1795sub should_begin_replicating_fidid { 1796 my ($self, $fidid) = @_; 1797 my $lockname = "mgfs:fid:$fidid:replicate"; 1798 return 1 if $self->get_lock($lockname, 1); 1799 return 0; 1800} 1801 1802# called when replicator is done replicating a fid, so you can cleanup 1803# whatever you did in 'should_begin_replicating_fidid' above. 1804# 1805# NOTE: there's a theoretical race condition in the rebalance code, 1806# where (without locking as provided by 1807# should_begin_replicating_fidid/note_done_replicating), all copies of 1808# a file can be deleted by independent replicators doing rebalancing 1809# in different ways. so you'll probably want to implement some 1810# locking in this pair of functions. 1811sub note_done_replicating { 1812 my ($self, $fidid) = @_; 1813 my $lockname = "mgfs:fid:$fidid:replicate"; 1814 $self->release_lock($lockname); 1815} 1816 1817sub find_fid_from_file_to_replicate { 1818 my ($self, $fidid) = @_; 1819 return $self->dbh->selectrow_hashref("SELECT fid, nexttry, fromdevid, failcount, flags FROM file_to_replicate WHERE fid = ?", 1820 undef, $fidid); 1821} 1822 1823sub find_fid_from_file_to_delete2 { 1824 my ($self, $fidid) = @_; 1825 return $self->dbh->selectrow_hashref("SELECT fid, nexttry, failcount FROM file_to_delete2 WHERE fid = ?", 1826 undef, $fidid); 1827} 1828 1829sub find_fid_from_file_to_queue { 1830 my ($self, $fidid, $type) = @_; 1831 return $self->dbh->selectrow_hashref("SELECT fid, devid, type, nexttry, failcount, flags, arg FROM file_to_queue WHERE fid = ? AND type = ?", 1832 undef, $fidid, $type); 1833} 1834 1835sub delete_fid_from_file_to_replicate { 1836 my ($self, $fidid) = @_; 1837 $self->retry_on_deadlock(sub { 1838 $self->dbh->do("DELETE FROM file_to_replicate WHERE fid=?", undef, $fidid); 1839 }); 1840} 1841 1842sub delete_fid_from_file_to_queue { 1843 my ($self, $fidid, $type) = @_; 1844 $self->retry_on_deadlock(sub { 1845 $self->dbh->do("DELETE FROM file_to_queue WHERE fid=? and type=?", 1846 undef, $fidid, $type); 1847 }); 1848} 1849 1850sub delete_fid_from_file_to_delete2 { 1851 my ($self, $fidid) = @_; 1852 $self->retry_on_deadlock(sub { 1853 $self->dbh->do("DELETE FROM file_to_delete2 WHERE fid=?", undef, $fidid); 1854 }); 1855} 1856 1857sub reschedule_file_to_replicate_absolute { 1858 my ($self, $fid, $abstime) = @_; 1859 $self->retry_on_deadlock(sub { 1860 $self->dbh->do("UPDATE file_to_replicate SET nexttry = ?, failcount = failcount + 1 WHERE fid = ?", 1861 undef, $abstime, $fid); 1862 }); 1863} 1864 1865sub reschedule_file_to_replicate_relative { 1866 my ($self, $fid, $in_n_secs) = @_; 1867 $self->retry_on_deadlock(sub { 1868 $self->dbh->do("UPDATE file_to_replicate SET nexttry = " . $self->unix_timestamp . " + ?, " . 1869 "failcount = failcount + 1 WHERE fid = ?", 1870 undef, $in_n_secs, $fid); 1871 }); 1872} 1873 1874sub reschedule_file_to_delete2_absolute { 1875 my ($self, $fid, $abstime) = @_; 1876 $self->retry_on_deadlock(sub { 1877 $self->dbh->do("UPDATE file_to_delete2 SET nexttry = ?, failcount = failcount + 1 WHERE fid = ?", 1878 undef, $abstime, $fid); 1879 }); 1880} 1881 1882sub reschedule_file_to_delete2_relative { 1883 my ($self, $fid, $in_n_secs) = @_; 1884 $self->retry_on_deadlock(sub { 1885 $self->dbh->do("UPDATE file_to_delete2 SET nexttry = " . $self->unix_timestamp . " + ?, " . 1886 "failcount = failcount + 1 WHERE fid = ?", 1887 undef, $in_n_secs, $fid); 1888 }); 1889} 1890 1891# Given a dmid prefix after and limit, return an arrayref of dkey from the file 1892# table. 1893sub get_keys_like { 1894 my ($self, $dmid, $prefix, $after, $limit) = @_; 1895 # fix the input... prefix always ends with a % so that it works 1896 # in a LIKE call, and after is either blank or something 1897 $prefix = '' unless defined $prefix; 1898 1899 # escape underscores, % and \ 1900 $prefix =~ s/([%\\_])/\\$1/g; 1901 1902 $prefix .= '%'; 1903 $after = '' unless defined $after; 1904 1905 my $like = $self->get_keys_like_operator; 1906 1907 # now select out our keys 1908 return $self->dbh->selectcol_arrayref 1909 ("SELECT dkey FROM file WHERE dmid = ? AND dkey $like ? ESCAPE ? AND dkey > ? " . 1910 "ORDER BY dkey LIMIT $limit", undef, $dmid, $prefix, "\\", $after); 1911} 1912 1913sub get_keys_like_operator { return "LIKE"; } 1914 1915# return arrayref of all tempfile rows (themselves also arrayrefs, of [$fidid, $devids]) 1916# that were created $secs_ago seconds ago or older. 1917sub old_tempfiles { 1918 my ($self, $secs_old) = @_; 1919 return $self->dbh->selectall_arrayref("SELECT fid, devids FROM tempfile " . 1920 "WHERE createtime < " . $self->unix_timestamp . " - $secs_old LIMIT 50"); 1921} 1922 1923# given an array of MogileFS::DevFID objects, mass-insert them all 1924# into file_on (ignoring if they're already present) 1925sub mass_insert_file_on { 1926 my ($self, @devfids) = @_; 1927 return 1 unless @devfids; 1928 1929 if (@devfids > 1 && ! $self->can_insert_multi) { 1930 $self->mass_insert_file_on($_) foreach @devfids; 1931 return 1; 1932 } 1933 1934 my (@qmarks, @binds); 1935 foreach my $df (@devfids) { 1936 my ($fidid, $devid) = ($df->fidid, $df->devid); 1937 Carp::croak("got a false fidid") unless $fidid; 1938 Carp::croak("got a false devid") unless $devid; 1939 push @binds, $fidid, $devid; 1940 push @qmarks, "(?,?)"; 1941 } 1942 1943 # TODO: This should possibly be insert_ignore instead 1944 # As if we are adding an extra file_on entry, we do not want to replace the 1945 # exist one. Check REPLACE semantics. 1946 $self->dowell($self->ignore_replace . " INTO file_on (fid, devid) VALUES " . join(',', @qmarks), undef, @binds); 1947 return 1; 1948} 1949 1950sub set_schema_vesion { 1951 my ($self, $ver) = @_; 1952 $self->set_server_setting("schema_version", int($ver)); 1953} 1954 1955# returns array of fidids to try and delete again 1956sub fids_to_delete_again { 1957 my $self = shift; 1958 my $ut = $self->unix_timestamp; 1959 return @{ $self->dbh->selectcol_arrayref(qq{ 1960 SELECT fid 1961 FROM file_to_delete_later 1962 WHERE delafter < $ut 1963 LIMIT 500 1964 }) || [] }; 1965} 1966 1967# return 1 on success. die otherwise. 1968sub enqueue_fids_to_delete { 1969 my ($self, @fidids) = @_; 1970 # multi-row insert-ignore/replace CAN fail with the insert_ignore emulation sub. 1971 # when the first row causes the duplicate error, and the remaining rows are 1972 # not processed. 1973 if (@fidids > 1 && ! ($self->can_insert_multi && ($self->can_replace || $self->can_insertignore))) { 1974 $self->enqueue_fids_to_delete($_) foreach @fidids; 1975 return 1; 1976 } 1977 # TODO: convert to prepared statement? 1978 $self->retry_on_deadlock(sub { 1979 $self->dbh->do($self->ignore_replace . " INTO file_to_delete (fid) VALUES " . 1980 join(",", map { "(" . int($_) . ")" } @fidids)); 1981 }); 1982 $self->condthrow; 1983} 1984 1985sub enqueue_fids_to_delete2 { 1986 my ($self, @fidids) = @_; 1987 # multi-row insert-ignore/replace CAN fail with the insert_ignore emulation sub. 1988 # when the first row causes the duplicate error, and the remaining rows are 1989 # not processed. 1990 if (@fidids > 1 && ! ($self->can_insert_multi && ($self->can_replace || $self->can_insertignore))) { 1991 $self->enqueue_fids_to_delete2($_) foreach @fidids; 1992 return 1; 1993 } 1994 1995 my $nexttry = $self->unix_timestamp; 1996 1997 # TODO: convert to prepared statement? 1998 $self->retry_on_deadlock(sub { 1999 $self->dbh->do($self->ignore_replace . " INTO file_to_delete2 (fid, 2000 nexttry) VALUES " . 2001 join(",", map { "(" . int($_) . ", $nexttry)" } @fidids)); 2002 }); 2003 $self->condthrow; 2004} 2005 2006# clears everything from the fsck_log table 2007# return 1 on success. die otherwise. 2008sub clear_fsck_log { 2009 my $self = shift; 2010 $self->dbh->do("DELETE FROM fsck_log"); 2011 return 1; 2012} 2013 2014# FIXME: Fsck log entries are processed a little out of order. 2015# Once a fsck has completed, the log should be re-summarized. 2016sub fsck_log_summarize { 2017 my $self = shift; 2018 2019 my $lockname = 'mgfs:fscksum'; 2020 my $lock = eval { $self->get_lock($lockname, 10) }; 2021 return 0 if defined $lock && $lock == 0; 2022 2023 my $logid = $self->max_fsck_logid; 2024 2025 # sum-up evcode counts every so often, to make fsck_status faster, 2026 # avoiding a potentially-huge GROUP BY in the future.. 2027 my $start_max_logid = $self->server_setting("fsck_start_maxlogid") || 0; 2028 # both inclusive: 2029 my $min_logid = $self->server_setting("fsck_logid_processed") || 0; 2030 $min_logid++; 2031 my $cts = $self->fsck_evcode_counts(logid_range => [$min_logid, $logid]); # inclusive notation :) 2032 while (my ($evcode, $ct) = each %$cts) { 2033 $self->incr_server_setting("fsck_sum_evcount_$evcode", $ct); 2034 } 2035 $self->set_server_setting("fsck_logid_processed", $logid); 2036 2037 $self->release_lock($lockname) if $lock; 2038} 2039 2040sub fsck_log { 2041 my ($self, %opts) = @_; 2042 $self->dbh->do("INSERT INTO fsck_log (utime, fid, evcode, devid) ". 2043 "VALUES (" . $self->unix_timestamp . ",?,?,?)", 2044 undef, 2045 delete $opts{fid}, 2046 delete $opts{code}, 2047 delete $opts{devid}); 2048 croak("Unknown opts") if %opts; 2049 $self->condthrow; 2050 2051 return 1; 2052} 2053 2054sub get_db_unixtime { 2055 my $self = shift; 2056 return $self->dbh->selectrow_array("SELECT " . $self->unix_timestamp); 2057} 2058 2059sub max_fidid { 2060 my $self = shift; 2061 return $self->dbh->selectrow_array("SELECT MAX(fid) FROM file"); 2062} 2063 2064sub max_fsck_logid { 2065 my $self = shift; 2066 return $self->dbh->selectrow_array("SELECT MAX(logid) FROM fsck_log") || 0; 2067} 2068 2069# returns array of $row hashrefs, from fsck_log table 2070sub fsck_log_rows { 2071 my ($self, $after_logid, $limit) = @_; 2072 $limit = int($limit || 100); 2073 $after_logid = int($after_logid || 0); 2074 2075 my @rows; 2076 my $sth = $self->dbh->prepare(qq{ 2077 SELECT logid, utime, fid, evcode, devid 2078 FROM fsck_log 2079 WHERE logid > ? 2080 ORDER BY logid 2081 LIMIT $limit 2082 }); 2083 $sth->execute($after_logid); 2084 my $row; 2085 push @rows, $row while $row = $sth->fetchrow_hashref; 2086 return @rows; 2087} 2088 2089sub fsck_evcode_counts { 2090 my ($self, %opts) = @_; 2091 my $timegte = delete $opts{time_gte}; 2092 my $logr = delete $opts{logid_range}; 2093 die if %opts; 2094 2095 my $ret = {}; 2096 my $sth; 2097 if ($timegte) { 2098 $sth = $self->dbh->prepare(qq{ 2099 SELECT evcode, COUNT(*) FROM fsck_log 2100 WHERE utime >= ? 2101 GROUP BY evcode 2102 }); 2103 $sth->execute($timegte||0); 2104 } 2105 if ($logr) { 2106 $sth = $self->dbh->prepare(qq{ 2107 SELECT evcode, COUNT(*) FROM fsck_log 2108 WHERE logid >= ? AND logid <= ? 2109 GROUP BY evcode 2110 }); 2111 $sth->execute($logr->[0], $logr->[1]); 2112 } 2113 while (my ($ev, $ct) = $sth->fetchrow_array) { 2114 $ret->{$ev} = $ct; 2115 } 2116 return $ret; 2117} 2118 2119# run before daemonizing. you can die from here if you see something's amiss. or emit 2120# warnings. 2121sub pre_daemonize_checks { 2122 my $self = shift; 2123 2124 $self->pre_daemonize_check_slaves; 2125} 2126 2127sub pre_daemonize_check_slaves { 2128 my $sk = MogileFS::Config->server_setting('slave_keys') 2129 or return; 2130 2131 my @slaves; 2132 foreach my $key (split /\s*,\s*/, $sk) { 2133 my $slave = MogileFS::Config->server_setting("slave_$key"); 2134 2135 if (!$slave) { 2136 error("key for slave DB config: slave_$key not found in configuration"); 2137 next; 2138 } 2139 2140 my ($dsn, $user, $pass) = split /\|/, $slave; 2141 if (!defined($dsn) or !defined($user) or !defined($pass)) { 2142 error("key slave_$key contains $slave, which doesn't split in | into DSN|user|pass - ignoring"); 2143 next; 2144 } 2145 push @slaves, [$dsn, $user, $pass] 2146 } 2147 2148 return unless @slaves; # Escape this block if we don't have a set of slaves anyways 2149 2150 MogileFS::run_global_hook('slave_list_check', \@slaves); 2151} 2152 2153 2154# attempt to grab a lock of lockname, and timeout after timeout seconds. 2155# returns 1 on success and 0 on timeout. dies if more than one lock is already outstanding. 2156sub get_lock { 2157 my ($self, $lockname, $timeout) = @_; 2158 die "Lock recursion detected (grabbing $lockname, had $self->{last_lock}). Bailing out." if $self->{lock_depth}; 2159 die "get_lock not implemented for $self"; 2160} 2161 2162# attempt to release a lock of lockname. 2163# returns 1 on success and 0 if no lock we have has that name. 2164sub release_lock { 2165 my ($self, $lockname) = @_; 2166 die "release_lock not implemented for $self"; 2167} 2168 2169# MySQL has an issue where you either get excessive deadlocks, or INSERT's 2170# hang forever around some transactions. Use ghetto locking to cope. 2171sub lock_queue { 1 } 2172sub unlock_queue { 1 } 2173 2174sub BLOB_BIND_TYPE { undef; } 2175 2176sub set_checksum { 2177 my ($self, $fidid, $hashtype, $checksum) = @_; 2178 my $dbh = $self->dbh; 2179 die "Your database does not support REPLACE! Reimplement set_checksum!" unless $self->can_replace; 2180 2181 eval { 2182 my $sth = $dbh->prepare("REPLACE INTO checksum " . 2183 "(fid, hashtype, checksum) " . 2184 "VALUES (?, ?, ?)"); 2185 $sth->bind_param(1, $fidid); 2186 $sth->bind_param(2, $hashtype); 2187 $sth->bind_param(3, $checksum, BLOB_BIND_TYPE); 2188 $sth->execute; 2189 }; 2190 $self->condthrow; 2191} 2192 2193sub get_checksum { 2194 my ($self, $fidid) = @_; 2195 2196 $self->dbh->selectrow_hashref("SELECT fid, hashtype, checksum " . 2197 "FROM checksum WHERE fid = ?", 2198 undef, $fidid); 2199} 2200 2201sub delete_checksum { 2202 my ($self, $fidid) = @_; 2203 2204 $self->dbh->do("DELETE FROM checksum WHERE fid = ?", undef, $fidid); 2205} 2206 2207# setup the value used in a 'nexttry' field to indicate that this item will 2208# never actually be tried again and require some sort of manual intervention. 2209use constant ENDOFTIME => 2147483647; 2210 2211sub end_of_time { ENDOFTIME; } 2212 2213# returns the size of the non-urgent replication queue 2214# nexttry == 0 - the file is urgent 2215# nexttry != 0 && nexttry < ENDOFTIME - the file is deferred 2216sub deferred_repl_queue_length { 2217 my ($self) = @_; 2218 2219 return $self->dbh->selectrow_array('SELECT COUNT(*) FROM file_to_replicate WHERE nexttry != 0 AND nexttry < ?', undef, $self->end_of_time); 2220} 2221 22221; 2223 2224__END__ 2225 2226=head1 NAME 2227 2228MogileFS::Store - data storage provider. base class. 2229 2230=head1 ABOUT 2231 2232MogileFS aims to be database-independent (though currently as of late 22332006 only works with MySQL). In the future, the server will create a 2234singleton instance of type "MogileFS::Store", like 2235L<MogileFS::Store::MySQL>, and all database interaction will be 2236through it. 2237 2238=head1 SEE ALSO 2239 2240L<MogileFS::Store::MySQL> 2241 2242 2243