1package MogileFS::Store;
2use strict;
3use warnings;
4use Carp qw(croak confess);
5use MogileFS::Util qw(throw max error);
6use DBI;  # no reason a Store has to be DBI-based, but for now they all are.
7use List::Util qw(shuffle);
8
9# this is incremented whenever the schema changes.  server will refuse
10# to start-up with an old schema version
11#
12# 6: adds file_to_replicate table
13# 7: adds file_to_delete_later table
14# 8: adds fsck_log table
15# 9: adds 'drain' state to enum in device table
16# 10: adds 'replpolicy' column to 'class' table
17# 11: adds 'file_to_queue' table
18# 12: adds 'file_to_delete2' table
19# 13: modifies 'server_settings.value' to TEXT for wider values
20#     also adds a TEXT 'arg' column to file_to_queue for passing arguments
21# 14: modifies 'device' mb_total, mb_used to INT for devs > 16TB
22# 15: adds checksum table, adds 'hashtype' column to 'class' table
23# 16: no-op, see 17
24# 17: adds 'readonly' state to enum in host table
25use constant SCHEMA_VERSION => 17;
26
27sub new {
28    my ($class) = @_;
29    return $class->new_from_dsn_user_pass(map { MogileFS->config($_) } qw(db_dsn db_user db_pass max_handles));
30}
31
32sub new_from_dsn_user_pass {
33    my ($class, $dsn, $user, $pass, $max_handles) = @_;
34    my $subclass;
35    if ($dsn =~ /^DBI:mysql:/i) {
36        $subclass = "MogileFS::Store::MySQL";
37    } elsif ($dsn =~ /^DBI:SQLite:/i) {
38        $subclass = "MogileFS::Store::SQLite";
39    } elsif ($dsn =~ /^DBI:Oracle:/i) {
40        $subclass = "MogileFS::Store::Oracle";
41    } elsif ($dsn =~ /^DBI:Pg:/i) {
42        $subclass = "MogileFS::Store::Postgres";
43    } else {
44        die "Unknown database type: $dsn";
45    }
46    unless (eval "use $subclass; 1") {
47        die "Error loading $subclass: $@\n";
48    }
49    my $self = bless {
50        dsn    => $dsn,
51        user   => $user,
52        pass   => $pass,
53        max_handles => $max_handles, # Max number of handles to allow
54        raise_errors => $subclass->want_raise_errors,
55        slave_list_version => 0,
56        slave_list_cache     => [],
57        recheck_req_gen  => 0,  # incremented generation, of recheck of dbh being requested
58        recheck_done_gen => 0,  # once recheck is done, copy of what the request generation was
59        handles_left     => 0,  # amount of times this handle can still be verified
60        connected_slaves => {},
61        dead_slaves      => {},
62        dead_backoff     => {}, # how many times in a row a slave has died
63        connect_timeout  => 10, # High default.
64    }, $subclass;
65    $self->init;
66    return $self;
67}
68
69# Defaults to true now.
70sub want_raise_errors {
71    1;
72}
73
74sub new_from_mogdbsetup {
75    my ($class, %args) = @_;
76    # where args is:  dbhost dbport dbname dbrootuser dbrootpass dbuser dbpass
77    my $dsn = $class->dsn_of_dbhost($args{dbname}, $args{dbhost}, $args{dbport});
78
79    my $try_make_sto = sub {
80        my $dbh = DBI->connect($dsn, $args{dbuser}, $args{dbpass}, {
81            PrintError => 0,
82        }) or return undef;
83        my $sto = $class->new_from_dsn_user_pass($dsn, $args{dbuser}, $args{dbpass});
84        $sto->raise_errors;
85        return $sto;
86    };
87
88    # upgrading, apparently, as this database already exists.
89    my $sto = $try_make_sto->();
90    return $sto if $sto;
91
92    # otherwise, we need to make the requested database, setup permissions, etc
93    $class->status("couldn't connect to database as mogilefs user.  trying root...");
94    my $rootdsn = $class->dsn_of_root($args{dbname}, $args{dbhost}, $args{dbport});
95    my $rdbh = DBI->connect($rootdsn, $args{dbrootuser}, $args{dbrootpass}, {
96        PrintError => 0,
97    }) or
98        die "Failed to connect to $rootdsn as specified root user ($args{dbrootuser}): " . DBI->errstr . "\n";
99    $class->status("connected to database as root user.");
100
101    $class->confirm("Create/Upgrade database name '$args{dbname}'?");
102    $class->create_db_if_not_exists($rdbh, $args{dbname});
103    $class->confirm("Grant all privileges to user '$args{dbuser}', connecting from anywhere, to the mogilefs database '$args{dbname}'?");
104    $class->grant_privileges($rdbh, $args{dbname}, $args{dbuser}, $args{dbpass});
105
106    # should be ready now:
107    $sto = $try_make_sto->();
108    return $sto if $sto;
109
110    die "Failed to connect to database as regular user, even after creating it and setting up permissions as the root user.";
111}
112
113# given a root DBI connection, create the named database.  succeed
114# if it it's made, or already exists.  die otherwise.
115sub create_db_if_not_exists {
116    my ($pkg, $rdbh, $dbname) = @_;
117    $rdbh->do("CREATE DATABASE IF NOT EXISTS $dbname")
118        or die "Failed to create database '$dbname': " . $rdbh->errstr . "\n";
119}
120
121sub grant_privileges {
122    my ($pkg, $rdbh, $dbname, $user, $pass) = @_;
123    $rdbh->do("GRANT ALL PRIVILEGES ON $dbname.* TO $user\@'\%' IDENTIFIED BY ?",
124             undef, $pass)
125        or die "Failed to grant privileges: " . $rdbh->errstr . "\n";
126    $rdbh->do("GRANT ALL PRIVILEGES ON $dbname.* TO $user\@'localhost' IDENTIFIED BY ?",
127             undef, $pass)
128        or die "Failed to grant privileges: " . $rdbh->errstr . "\n";
129}
130
131sub can_replace      { 0 }
132sub can_insertignore { 0 }
133sub can_insert_multi { 0 }
134sub can_for_update   { 1 }
135
136sub unix_timestamp { die "No function in $_[0] to return DB's unixtime." }
137
138sub ignore_replace {
139    my $self = shift;
140    return "INSERT IGNORE " if $self->can_insertignore;
141    return "REPLACE " if $self->can_replace;
142    die "Can't INSERT IGNORE or REPLACE?";
143}
144
145my $on_status = sub {};
146my $on_confirm = sub { 1 };
147sub on_status  { my ($pkg, $code) = @_; $on_status  = $code; };
148sub on_confirm { my ($pkg, $code) = @_; $on_confirm = $code; };
149sub status     { my ($pkg, $msg)  = @_; $on_status->($msg);  };
150sub confirm    { my ($pkg, $msg)  = @_; $on_confirm->($msg) or die "Aborted.\n"; };
151
152sub latest_schema_version { SCHEMA_VERSION }
153
154sub raise_errors {
155    my $self = shift;
156    $self->{raise_errors} = 1;
157    $self->dbh->{RaiseError} = 1;
158}
159
160sub set_connect_timeout { $_[0]{connect_timeout} = $_[1]; }
161
162sub dsn  { $_[0]{dsn}  }
163sub user { $_[0]{user} }
164sub pass { $_[0]{pass} }
165
166sub connect_timeout { $_[0]{connect_timeout} }
167
168sub init { 1 }
169sub post_dbi_connect { 1 }
170
171sub can_do_slaves { 0 }
172
173sub mark_as_slave {
174    my $self = shift;
175    die "Incapable of becoming slave." unless $self->can_do_slaves;
176
177    $self->{is_slave} = 1;
178}
179
180sub is_slave {
181    my $self = shift;
182    return $self->{is_slave};
183}
184
185sub _slaves_list_changed {
186    my $self = shift;
187    my $ver = MogileFS::Config->server_setting_cached('slave_version') || 0;
188    if ($ver <= $self->{slave_list_version}) {
189        return 0;
190    }
191    $self->{slave_list_version} = $ver;
192    # Restart connections from scratch if the configuration changed.
193    $self->{connected_slaves} = {};
194    return 1;
195}
196
197# Returns a list of arrayrefs, each being [$dsn, $username, $password] for connecting to a slave DB.
198sub _slaves_list {
199    my $self = shift;
200    my $now = time();
201
202    my $sk = MogileFS::Config->server_setting_cached('slave_keys')
203        or return ();
204
205    my @ret;
206    foreach my $key (split /\s*,\s*/, $sk) {
207        my $slave = MogileFS::Config->server_setting_cached("slave_$key");
208
209        if (!$slave) {
210            error("key for slave DB config: slave_$key not found in configuration");
211            next;
212        }
213
214        my ($dsn, $user, $pass) = split /\|/, $slave;
215        if (!defined($dsn) or !defined($user) or !defined($pass)) {
216            error("key slave_$key contains $slave, which doesn't split in | into DSN|user|pass - ignoring");
217            next;
218        }
219        push @ret, [$dsn, $user, $pass]
220    }
221
222    return @ret;
223}
224
225sub _pick_slave {
226    my $self = shift;
227    my @temp = shuffle keys %{$self->{connected_slaves}};
228    return unless @temp;
229    return $self->{connected_slaves}->{$temp[0]};
230}
231
232sub _connect_slave {
233    my $self = shift;
234    my $slave_fulldsn = shift;
235    my $now = time();
236
237    my $dead_retry =
238        MogileFS::Config->server_setting_cached('slave_dead_retry_timeout') || 15;
239
240    my $dead_backoff = $self->{dead_backoff}->{$slave_fulldsn->[0]} || 0;
241    my $dead_timeout = $self->{dead_slaves}->{$slave_fulldsn->[0]};
242    return if (defined $dead_timeout
243        && $dead_timeout + ($dead_retry * $dead_backoff) > $now);
244    return if ($self->{connected_slaves}->{$slave_fulldsn->[0]});
245
246    my $newslave = $self->{slave} = $self->new_from_dsn_user_pass(@$slave_fulldsn);
247    $newslave->set_connect_timeout(
248        MogileFS::Config->server_setting_cached('slave_connect_timeout') || 1);
249    $self->{slave}->{next_check} = 0;
250    $newslave->mark_as_slave;
251    if ($self->check_slave) {
252        $self->{connected_slaves}->{$slave_fulldsn->[0]} = $newslave;
253        $self->{dead_backoff}->{$slave_fulldsn->[0]} = 0;
254    } else {
255        # Magic numbers are saddening...
256        $dead_backoff++ unless $dead_backoff > 20;
257        $self->{dead_slaves}->{$slave_fulldsn->[0]} = $now;
258        $self->{dead_backoff}->{$slave_fulldsn->[0]} = $dead_backoff;
259    }
260}
261
262sub get_slave {
263    my $self = shift;
264
265    die "Incapable of having slaves." unless $self->can_do_slaves;
266
267    $self->{slave} = undef;
268    foreach my $slave (keys %{$self->{dead_slaves}}) {
269        my ($full_dsn) = grep { $slave eq $_->[0] } @{$self->{slave_list_cache}};
270        unless ($full_dsn) {
271            delete $self->{dead_slaves}->{$slave};
272            next;
273        }
274        $self->_connect_slave($full_dsn);
275    }
276
277    unless ($self->_slaves_list_changed) {
278        if ($self->{slave} = $self->_pick_slave) {
279            $self->{slave}->{recheck_req_gen} = $self->{recheck_req_gen};
280            return $self->{slave} if $self->check_slave;
281        }
282    }
283
284    if ($self->{slave}) {
285        my $dsn = $self->{slave}->{dsn};
286        $self->{dead_slaves}->{$dsn} = time();
287        $self->{dead_backoff}->{$dsn} = 0;
288        delete $self->{connected_slaves}->{$dsn};
289        error("Error talking to slave: $dsn");
290    }
291    my @slaves_list = $self->_slaves_list;
292
293    # If we have no slaves, then return silently.
294    return unless @slaves_list;
295
296    my $slave_skip_filtering = MogileFS::Config->server_setting_cached('slave_skip_filtering');
297
298    unless (defined $slave_skip_filtering && $slave_skip_filtering eq 'on') {
299        MogileFS::run_global_hook('slave_list_filter', \@slaves_list);
300    }
301
302    $self->{slave_list_cache} = \@slaves_list;
303
304    foreach my $slave_fulldsn (@slaves_list) {
305        $self->_connect_slave($slave_fulldsn);
306    }
307
308    if ($self->{slave} = $self->_pick_slave) {
309        return $self->{slave};
310    }
311    warn "Slave list exhausted, failing back to master.";
312    return;
313}
314
315sub read_store {
316    my $self = shift;
317
318    return $self unless $self->can_do_slaves;
319
320    if ($self->{slave_ok}) {
321        if (my $slave = $self->get_slave) {
322            return $slave;
323        }
324    }
325
326    return $self;
327}
328
329sub slaves_ok {
330    my $self = shift;
331    my $coderef = shift;
332
333    return unless ref $coderef eq 'CODE';
334
335    local $self->{slave_ok} = 1;
336
337    return $coderef->(@_);
338}
339
340sub recheck_dbh {
341    my $self = shift;
342    $self->{recheck_req_gen}++;
343}
344
345sub dbh {
346    my $self = shift;
347
348    if ($self->{dbh}) {
349        if ($self->{recheck_done_gen} != $self->{recheck_req_gen}) {
350            $self->{dbh} = undef unless $self->{dbh}->ping;
351            # Handles a memory leak under Solaris/Postgres.
352            # We may leak a little extra memory if we're holding a lock,
353            # since dropping a connection mid-lock is fatal
354            $self->{dbh} = undef if ($self->{max_handles} &&
355                $self->{handles_left}-- < 0 && !$self->{lock_depth});
356            $self->{recheck_done_gen} = $self->{recheck_req_gen};
357        }
358        return $self->{dbh} if $self->{dbh};
359    }
360
361    # Shortcut flag: if monitor thinks the master is down, avoid attempting to
362    # connect to it for now. If we already have a connection to the master,
363    # keep using it as above.
364    if (!$self->is_slave) {
365        my $flag = MogileFS::Config->server_setting_cached('_master_db_alive', 0);
366        return if (defined $flag && $flag == 0);;
367    }
368
369    # auto-reconnect is unsafe if we're holding a lock
370    if ($self->{lock_depth}) {
371        die "DB connection recovery unsafe, lock held: $self->{last_lock}";
372    }
373
374    eval {
375        local $SIG{ALRM} = sub { die "timeout\n" };
376        alarm($self->connect_timeout);
377        $self->{dbh} = DBI->connect($self->{dsn}, $self->{user}, $self->{pass}, {
378            PrintError => 0,
379            AutoCommit => 1,
380            # FUTURE: will default to on (have to validate all callers first):
381            RaiseError => ($self->{raise_errors} || 0),
382            sqlite_use_immediate_transaction => 1,
383        });
384    };
385    alarm(0);
386    if ($@ eq "timeout\n") {
387        die "Failed to connect to database: timeout";
388    } elsif ($@) {
389        die "Failed to connect to database: " . DBI->errstr;
390    }
391    $self->post_dbi_connect;
392    $self->{handles_left} = $self->{max_handles} if $self->{max_handles};
393    return $self->{dbh};
394}
395
396sub have_dbh { return 1 if $_[0]->{dbh}; }
397
398sub ping {
399    my $self = shift;
400    return $self->dbh->ping;
401}
402
403sub condthrow {
404    my ($self, $optmsg) = @_;
405    my $dbh = $self->dbh;
406    return 1 unless $dbh->err;
407    my ($pkg, $fn, $line) = caller;
408    my $msg = "Database error from $pkg/$fn/$line: " . $dbh->errstr;
409    $msg .= ": $optmsg" if $optmsg;
410    # Auto rollback failures around transactions.
411    if ($dbh->{AutoCommit} == 0) { eval { $dbh->rollback }; }
412    croak($msg);
413}
414
415sub dowell {
416    my ($self, $sql, @do_params) = @_;
417    my $rv = eval { $self->dbh->do($sql, @do_params) };
418    return $rv unless $@ || $self->dbh->err;
419    warn "Error with SQL: $sql\n";
420    Carp::confess($@ || $self->dbh->errstr);
421}
422
423sub _valid_params {
424    croak("Odd number of parameters!") if scalar(@_) % 2;
425    my ($self, $vlist, %uarg) = @_;
426    my %ret;
427    $ret{$_} = delete $uarg{$_} foreach @$vlist;
428    croak("Bogus options: ".join(',',keys %uarg)) if %uarg;
429    return %ret;
430}
431
432sub was_deadlock_error {
433    my $self = shift;
434    my $dbh = $self->dbh;
435    die "UNIMPLEMENTED";
436}
437
438sub was_duplicate_error {
439    my $self = shift;
440    my $dbh = $self->dbh;
441    die "UNIMPLEMENTED";
442}
443
444# run a subref (presumably a database update) in an eval, because you expect it to
445# maybe fail on duplicate key error, and throw a dup exception for you, else return
446# its return value
447sub conddup {
448    my ($self, $code) = @_;
449    my $rv = eval { $code->(); };
450    throw("dup") if $self->was_duplicate_error;
451    croak($@) if $@;
452    return $rv;
453}
454
455# insert row if doesn't already exist
456# WARNING: This function is NOT transaction safe if the duplicate errors causes
457# your transaction to halt!
458# WARNING: This function is NOT safe on multi-row inserts if can_insertignore
459# is false! Rows before the duplicate will be inserted, but rows after the
460# duplicate might not be, depending your database.
461sub insert_ignore {
462    my ($self, $sql, @params) = @_;
463    my $dbh = $self->dbh;
464    if ($self->can_insertignore) {
465        return $dbh->do("INSERT IGNORE $sql", @params);
466    } else {
467        # TODO: Detect bad multi-row insert here.
468        my $rv = eval { $dbh->do("INSERT $sql", @params); };
469        if ($@ || $dbh->err) {
470            return 1 if $self->was_duplicate_error;
471            # This chunk is identical to condthrow, but we include it directly
472            # here as we know there is definitely an error, and we would like
473            # the caller of this function.
474            my ($pkg, $fn, $line) = caller;
475            my $msg = "Database error from $pkg/$fn/$line: " . $dbh->errstr;
476            croak($msg);
477        }
478        return $rv;
479    }
480}
481
482sub retry_on_deadlock {
483    my $self  = shift;
484    my $code  = shift;
485    my $tries = shift || 3;
486    croak("deadlock retries must be positive") if $tries < 1;
487    my $rv;
488
489    while ($tries-- > 0) {
490        $rv = eval { $code->(); };
491        next if ($self->was_deadlock_error);
492        croak($@) if $@;
493        last;
494    }
495    return $rv;
496}
497
498# --------------------------------------------------------------------------
499
500my @extra_tables;
501
502sub add_extra_tables {
503    my $class = shift;
504    push @extra_tables, @_;
505}
506
507use constant TABLES => qw( domain class file tempfile file_to_delete
508                            unreachable_fids file_on file_on_corrupt host
509                            device server_settings file_to_replicate
510                            file_to_delete_later fsck_log file_to_queue
511                            file_to_delete2 checksum);
512
513sub setup_database {
514    my $sto = shift;
515
516    my $curver = $sto->schema_version;
517
518    my $latestver = SCHEMA_VERSION;
519    if ($curver == $latestver) {
520        $sto->status("Schema already up-to-date at version $curver.");
521        return 1;
522    }
523
524    if ($curver > $latestver) {
525        die "Your current schema version is $curver, but this version of mogdbsetup only knows up to $latestver.  Aborting to be safe.\n";
526    }
527
528    if ($curver) {
529        $sto->confirm("Install/upgrade your schema from version $curver to version $latestver?");
530    }
531
532    foreach my $t (TABLES, @extra_tables) {
533        $sto->create_table($t);
534    }
535
536    $sto->upgrade_add_host_getport;
537    $sto->upgrade_add_host_altip;
538    $sto->upgrade_add_device_asof;
539    $sto->upgrade_add_device_weight;
540    $sto->upgrade_add_device_readonly;
541    $sto->upgrade_add_device_drain;
542    $sto->upgrade_add_class_replpolicy;
543    $sto->upgrade_modify_server_settings_value;
544    $sto->upgrade_add_file_to_queue_arg;
545    $sto->upgrade_modify_device_size;
546    $sto->upgrade_add_class_hashtype;
547    $sto->upgrade_add_host_readonly;
548
549    return 1;
550}
551
552sub cached_schema_version {
553    my $self = shift;
554    return $self->{_cached_schema_version} ||=
555        $self->schema_version;
556}
557
558sub schema_version {
559    my $self = shift;
560    my $dbh = $self->dbh;
561    return eval {
562        $dbh->selectrow_array("SELECT value FROM server_settings WHERE field='schema_version'") || 0;
563    } || 0;
564}
565
566sub filter_create_sql { my ($self, $sql) = @_; return $sql; }
567
568sub create_table {
569    my ($self, $table) = @_;
570    my $dbh = $self->dbh;
571    return 1 if $self->table_exists($table);
572    my $meth = "TABLE_$table";
573    my $sql = $self->$meth;
574    $sql = $self->filter_create_sql($sql);
575    $self->status("Running SQL: $sql;");
576    $dbh->do($sql) or
577        die "Failed to create table $table: " . $dbh->errstr;
578    my $imeth = "INDEXES_$table";
579    my @indexes = eval { $self->$imeth };
580    foreach $sql (@indexes) {
581        $self->status("Running SQL: $sql;");
582        $dbh->do($sql) or
583            die "Failed to create indexes on $table: " . $dbh->errstr;
584    }
585}
586
587# Please try to keep all tables aligned nicely
588# with '"CREATE TABLE' on the first line
589# and ')"' alone on the last line.
590
591sub TABLE_domain {
592    # classes are tied to domains.  domains can have classes of items
593    # with different mindevcounts.
594    #
595    # a minimum devcount is the number of copies the system tries to
596    # maintain for files in that class
597    #
598    # unspecified classname means classid=0 (implicit class), and that
599    # implies mindevcount=2
600    "CREATE TABLE domain (
601    dmid         SMALLINT UNSIGNED NOT NULL PRIMARY KEY,
602    namespace    VARCHAR(255),
603    UNIQUE (namespace)
604    )"
605}
606
607sub TABLE_class {
608    "CREATE TABLE class (
609    dmid          SMALLINT UNSIGNED NOT NULL,
610    classid       TINYINT UNSIGNED NOT NULL,
611    PRIMARY KEY (dmid,classid),
612    classname     VARCHAR(50),
613    UNIQUE      (dmid,classname),
614    mindevcount   TINYINT UNSIGNED NOT NULL,
615    hashtype  TINYINT UNSIGNED
616    )"
617}
618
619# the length field is only here for easy verifications of content
620# integrity when copying around.  no sums or content types or other
621# metadata here.  application can handle that.
622#
623# classid is what class of file this belongs to.  for instance, on fotobilder
624# there will be a class for original pictures (the ones the user uploaded)
625# and a class for derived images (scaled down versions, thumbnails, greyscale, etc)
626# each domain can setup classes and assign the minimum redundancy level for
627# each class.  fotobilder will use a 2 or 3 minimum copy redundancy for original
628# photos and and a 1 minimum for derived images (which means the sole device
629# for a derived image can die, bringing devcount to 0 for that file, but
630# the application can recreate it from its original)
631sub TABLE_file {
632    "CREATE TABLE file (
633    fid          INT UNSIGNED NOT NULL,
634    PRIMARY KEY  (fid),
635
636    dmid          SMALLINT UNSIGNED NOT NULL,
637    dkey           VARCHAR(255),     # domain-defined
638    UNIQUE dkey  (dmid, dkey),
639
640    length        BIGINT UNSIGNED,   # big limit
641
642    classid       TINYINT UNSIGNED NOT NULL,
643    devcount      TINYINT UNSIGNED NOT NULL,
644    INDEX devcount (dmid,classid,devcount)
645    )"
646}
647
648sub TABLE_tempfile {
649    "CREATE TABLE tempfile (
650    fid          INT UNSIGNED NOT NULL AUTO_INCREMENT,
651    PRIMARY KEY  (fid),
652
653    createtime   INT UNSIGNED NOT NULL,
654    classid      TINYINT UNSIGNED NOT NULL,
655    dmid          SMALLINT UNSIGNED NOT NULL,
656    dkey           VARCHAR(255),
657    devids       VARCHAR(60)
658    )"
659}
660
661# files marked for death when their key is overwritten.  then they get a new
662# fid, but since the old row (with the old fid) had to be deleted immediately,
663# we need a place to store the fid so an async job can delete the file from
664# all devices.
665sub TABLE_file_to_delete {
666    "CREATE TABLE file_to_delete (
667    fid  INT UNSIGNED NOT NULL,
668    PRIMARY KEY (fid)
669    )"
670}
671
672# if the replicator notices that a fid has no sources, that file gets inserted
673# into the unreachable_fids table.  it is up to the application to actually
674# handle fids stored in this table.
675sub TABLE_unreachable_fids {
676    "CREATE TABLE unreachable_fids (
677    fid        INT UNSIGNED NOT NULL,
678    lastupdate INT UNSIGNED NOT NULL,
679    PRIMARY KEY (fid),
680    INDEX (lastupdate)
681    )"
682}
683
684# what files are on what devices?  (most likely physical devices,
685# as logical devices of RAID arrays would be costly, and mogilefs
686# already handles redundancy)
687#
688# the devid index lets us answer "What files were on this now-dead disk?"
689sub TABLE_file_on {
690    "CREATE TABLE file_on (
691    fid          INT UNSIGNED NOT NULL,
692    devid        MEDIUMINT UNSIGNED NOT NULL,
693    PRIMARY KEY (fid, devid),
694    INDEX (devid)
695    )"
696}
697
698# if application or framework detects an error in one of the duplicate files
699# for whatever reason, it can register its complaint and the framework
700# will do some verifications and fix things up w/ an async job
701# MAYBE: let application tell us the SHA1/MD5 of the file for us to check
702#        on the other devices?
703sub TABLE_file_on_corrupt {
704    "CREATE TABLE file_on_corrupt (
705    fid          INT UNSIGNED NOT NULL,
706    devid        MEDIUMINT UNSIGNED NOT NULL,
707    PRIMARY KEY (fid, devid)
708    )"
709}
710
711# hosts (which contain devices...)
712sub TABLE_host {
713    "CREATE TABLE host (
714    hostid     MEDIUMINT UNSIGNED NOT NULL PRIMARY KEY,
715
716    status     ENUM('alive','dead','down'),
717    http_port  MEDIUMINT UNSIGNED DEFAULT 7500,
718    http_get_port MEDIUMINT UNSIGNED,
719
720    hostname   VARCHAR(40),
721    hostip     VARCHAR(15),
722    altip      VARCHAR(15),
723    altmask    VARCHAR(18),
724    UNIQUE     (hostname),
725    UNIQUE     (hostip),
726    UNIQUE     (altip)
727    )"
728}
729
730# disks...
731sub TABLE_device {
732    "CREATE TABLE device (
733    devid   MEDIUMINT UNSIGNED NOT NULL,
734    hostid     MEDIUMINT UNSIGNED NOT NULL,
735
736    status  ENUM('alive','dead','down'),
737    weight  MEDIUMINT DEFAULT 100,
738
739    mb_total   INT UNSIGNED,
740    mb_used    INT UNSIGNED,
741    mb_asof    INT UNSIGNED,
742    PRIMARY KEY (devid),
743    INDEX   (status)
744    )"
745}
746
747sub TABLE_server_settings {
748    "CREATE TABLE server_settings (
749    field   VARCHAR(50) PRIMARY KEY,
750    value   TEXT
751    )"
752}
753
754sub TABLE_file_to_replicate {
755    # nexttry is time to try to replicate it next.
756    #   0 means immediate.  it's only on one host.
757    #   1 means lower priority.  it's on 2+ but isn't happy where it's at.
758    #   unix timestamp means at/after that time.  some previous error occurred.
759    # fromdevid, if not null, means which devid we should replicate from.  perhaps it's the only non-corrupt one.  otherwise, wherever.
760    # failcount.  how many times we've failed, just for doing backoff of nexttry.
761    # flags.  reserved for future use.
762    "CREATE TABLE file_to_replicate (
763    fid        INT UNSIGNED NOT NULL PRIMARY KEY,
764    nexttry    INT UNSIGNED NOT NULL,
765    INDEX (nexttry),
766    fromdevid  INT UNSIGNED,
767    failcount  TINYINT UNSIGNED NOT NULL DEFAULT 0,
768    flags      SMALLINT UNSIGNED NOT NULL DEFAULT 0
769    )"
770}
771
772sub TABLE_file_to_delete_later {
773    "CREATE TABLE file_to_delete_later (
774    fid  INT UNSIGNED NOT NULL PRIMARY KEY,
775    delafter INT UNSIGNED NOT NULL,
776    INDEX (delafter)
777    )"
778}
779
780sub TABLE_fsck_log {
781    "CREATE TABLE fsck_log (
782    logid  INT UNSIGNED NOT NULL AUTO_INCREMENT,
783    PRIMARY KEY (logid),
784    utime  INT UNSIGNED NOT NULL,
785    fid    INT UNSIGNED NULL,
786    evcode CHAR(4),
787    devid  MEDIUMINT UNSIGNED,
788    INDEX(utime)
789    )"
790}
791
792# generic queue table, designed to be used for workers/jobs which aren't
793# constantly in use, and are async to the user.
794# ie; fsck, drain, rebalance.
795sub TABLE_file_to_queue {
796    "CREATE TABLE file_to_queue (
797    fid       INT UNSIGNED NOT NULL,
798    devid     INT UNSIGNED,
799    type      TINYINT UNSIGNED NOT NULL,
800    nexttry   INT UNSIGNED NOT NULL,
801    failcount TINYINT UNSIGNED NOT NULL default '0',
802    flags     SMALLINT UNSIGNED NOT NULL default '0',
803    arg       TEXT,
804    PRIMARY KEY (fid, type),
805    INDEX type_nexttry (type,nexttry)
806    )"
807}
808
809# new style async delete table.
810# this is separate from file_to_queue since deletes are more actively used,
811# and partitioning on 'type' doesn't always work so well.
812sub TABLE_file_to_delete2 {
813    "CREATE TABLE file_to_delete2 (
814    fid INT UNSIGNED NOT NULL PRIMARY KEY,
815    nexttry INT UNSIGNED NOT NULL,
816    failcount TINYINT UNSIGNED NOT NULL default '0',
817    INDEX nexttry (nexttry)
818    )"
819}
820
821sub TABLE_checksum {
822    "CREATE TABLE checksum (
823    fid INT UNSIGNED NOT NULL PRIMARY KEY,
824    hashtype TINYINT UNSIGNED NOT NULL,
825    checksum VARBINARY(64) NOT NULL
826    )"
827}
828
829# these five only necessary for MySQL, since no other database existed
830# before, so they can just create the tables correctly to begin with.
831# in the future, there might be new alters that non-MySQL databases
832# will have to implement.
833sub upgrade_add_host_getport { 1 }
834sub upgrade_add_host_altip { 1 }
835sub upgrade_add_device_asof { 1 }
836sub upgrade_add_device_weight { 1 }
837sub upgrade_add_device_readonly { 1 }
838sub upgrade_add_device_drain { die "Not implemented in $_[0]" }
839sub upgrade_modify_server_settings_value { die "Not implemented in $_[0]" }
840sub upgrade_add_file_to_queue_arg { die "Not implemented in $_[0]" }
841sub upgrade_modify_device_size { die "Not implemented in $_[0]" }
842
843sub upgrade_add_class_replpolicy {
844    my ($self) = @_;
845    unless ($self->column_type("class", "replpolicy")) {
846        $self->dowell("ALTER TABLE class ADD COLUMN replpolicy VARCHAR(255)");
847    }
848}
849
850sub upgrade_add_class_hashtype {
851    my ($self) = @_;
852    unless ($self->column_type("class", "hashtype")) {
853        $self->dowell("ALTER TABLE class ADD COLUMN hashtype TINYINT UNSIGNED");
854    }
855}
856
857# return true if deleted, 0 if didn't exist, exception if error
858sub delete_host {
859    my ($self, $hostid) = @_;
860    return $self->dbh->do("DELETE FROM host WHERE hostid = ?", undef, $hostid);
861}
862
863# return true if deleted, 0 if didn't exist, exception if error
864sub delete_domain {
865    my ($self, $dmid) = @_;
866    my ($err, $rv);
867    my $dbh = $self->dbh;
868    eval {
869        $dbh->begin_work;
870        if ($self->domain_has_files($dmid)) {
871            $err = "has_files";
872        } elsif ($self->domain_has_classes($dmid)) {
873            $err = "has_classes";
874        } else {
875            $rv = $dbh->do("DELETE FROM domain WHERE dmid = ?", undef, $dmid);
876
877            # remove the "default" class if one was created (for mindevcount)
878            # this is currently the only way to delete the "default" class
879            $dbh->do("DELETE FROM class WHERE dmid = ? AND classid = 0", undef, $dmid);
880            $dbh->commit;
881        }
882	$dbh->rollback if $err;
883    };
884    $self->condthrow; # will rollback on errors
885    throw($err) if $err;
886    return $rv;
887}
888
889sub domain_has_files {
890    my ($self, $dmid) = @_;
891    my $has_a_fid = $self->dbh->selectrow_array('SELECT fid FROM file WHERE dmid = ? LIMIT 1',
892                                                undef, $dmid);
893    return $has_a_fid ? 1 : 0;
894}
895
896sub domain_has_classes {
897    my ($self, $dmid) = @_;
898    # queryworker does not permit removing default class, so domain_has_classes
899    # should not register the default class
900    my $has_a_class = $self->dbh->selectrow_array('SELECT classid FROM class WHERE dmid = ? AND classid != 0 LIMIT 1',
901        undef, $dmid);
902    return defined($has_a_class);
903}
904
905sub class_has_files {
906    my ($self, $dmid, $clid) = @_;
907    my $has_a_fid = $self->dbh->selectrow_array('SELECT fid FROM file WHERE dmid = ? AND classid = ? LIMIT 1',
908                                                undef, $dmid, $clid);
909    return $has_a_fid ? 1 : 0;
910}
911
912# return new classid on success (non-zero integer), die on failure
913# throw 'dup' on duplicate name
914sub create_class {
915    my ($self, $dmid, $classname) = @_;
916    my $dbh = $self->dbh;
917
918    my ($clsid, $rv);
919
920    eval {
921        $dbh->begin_work;
922        if ($classname eq 'default') {
923            $clsid = 0;
924        } else {
925            # get the max class id in this domain
926            my $maxid = $dbh->selectrow_array
927                ('SELECT MAX(classid) FROM class WHERE dmid = ?', undef, $dmid) || 0;
928            $clsid = $maxid + 1;
929        }
930        # now insert the new class
931        $rv = $dbh->do("INSERT INTO class (dmid, classid, classname, mindevcount) VALUES (?, ?, ?, ?)",
932                       undef, $dmid, $clsid, $classname, 2);
933        $dbh->commit if $rv;
934    };
935    if ($@ || $dbh->err) {
936        if ($self->was_duplicate_error) {
937            # ensure we're not inside a transaction
938            if ($dbh->{AutoCommit} == 0) { eval { $dbh->rollback }; }
939            throw("dup");
940        }
941    }
942    $self->condthrow; # this will rollback on errors
943    return $clsid if $rv;
944    die;
945}
946
947# return 1 on success, throw "dup" on duplicate name error, die otherwise
948sub update_class_name {
949    my $self = shift;
950    my %arg  = $self->_valid_params([qw(dmid classid classname)], @_);
951    my $rv = eval {
952        $self->dbh->do("UPDATE class SET classname=? WHERE dmid=? AND classid=?",
953                       undef, $arg{classname}, $arg{dmid}, $arg{classid});
954    };
955    throw("dup") if $self->was_duplicate_error;
956    $self->condthrow;
957    return 1;
958}
959
960# return 1 on success, die otherwise
961sub update_class_mindevcount {
962    my $self = shift;
963    my %arg  = $self->_valid_params([qw(dmid classid mindevcount)], @_);
964    eval {
965    $self->dbh->do("UPDATE class SET mindevcount=? WHERE dmid=? AND classid=?",
966                   undef, $arg{mindevcount}, $arg{dmid}, $arg{classid});
967    };
968    $self->condthrow;
969    return 1;
970}
971
972# return 1 on success, die otherwise
973sub update_class_replpolicy {
974    my $self = shift;
975    my %arg  = $self->_valid_params([qw(dmid classid replpolicy)], @_);
976    eval {
977    $self->dbh->do("UPDATE class SET replpolicy=? WHERE dmid=? AND classid=?",
978                   undef, $arg{replpolicy}, $arg{dmid}, $arg{classid});
979    };
980    $self->condthrow;
981    return 1;
982}
983
984# return 1 on success, die otherwise
985sub update_class_hashtype {
986    my $self = shift;
987    my %arg  = $self->_valid_params([qw(dmid classid hashtype)], @_);
988    eval {
989    $self->dbh->do("UPDATE class SET hashtype=? WHERE dmid=? AND classid=?",
990                   undef, $arg{hashtype}, $arg{dmid}, $arg{classid});
991    };
992    $self->condthrow;
993}
994
995sub nfiles_with_dmid_classid_devcount {
996    my ($self, $dmid, $classid, $devcount) = @_;
997    return $self->dbh->selectrow_array('SELECT COUNT(*) FROM file WHERE dmid = ? AND classid = ? AND devcount = ?',
998                                       undef, $dmid, $classid, $devcount);
999}
1000
1001sub set_server_setting {
1002    my ($self, $key, $val) = @_;
1003    my $dbh = $self->dbh;
1004    die "Your database does not support REPLACE! Reimplement set_server_setting!" unless $self->can_replace;
1005
1006    eval {
1007        if (defined $val) {
1008            $dbh->do("REPLACE INTO server_settings (field, value) VALUES (?, ?)", undef, $key, $val);
1009        } else {
1010            $dbh->do("DELETE FROM server_settings WHERE field=?", undef, $key);
1011        }
1012    };
1013
1014    die "Error updating 'server_settings': " . $dbh->errstr if $dbh->err;
1015    return 1;
1016}
1017
1018# FIXME: racy.  currently the only caller doesn't matter, but should be fixed.
1019sub incr_server_setting {
1020    my ($self, $key, $val) = @_;
1021    $val = 1 unless defined $val;
1022    return unless $val;
1023
1024    return 1 if $self->dbh->do("UPDATE server_settings ".
1025                               "SET value=value+? ".
1026                               "WHERE field=?", undef,
1027                               $val, $key) > 0;
1028    $self->set_server_setting($key, $val);
1029}
1030
1031sub server_setting {
1032    my ($self, $key) = @_;
1033    return $self->dbh->selectrow_array("SELECT value FROM server_settings WHERE field=?",
1034                                       undef, $key);
1035}
1036
1037sub server_settings {
1038    my ($self) = @_;
1039    my $ret = {};
1040    my $sth = $self->dbh->prepare("SELECT field, value FROM server_settings");
1041    $sth->execute;
1042    while (my ($k, $v) = $sth->fetchrow_array) {
1043        $ret->{$k} = $v;
1044    }
1045    return $ret;
1046}
1047
1048# register a tempfile and return the fidid, which should be allocated
1049# using autoincrement/sequences if the passed in fid is undef.  however,
1050# if fid is passed in, that value should be used and returned.
1051#
1052# return new/passed in fidid on success.
1053# throw 'dup' if fid already in use
1054# return 0/undef/die on failure
1055#
1056sub register_tempfile {
1057    my $self = shift;
1058    my %arg  = $self->_valid_params([qw(fid dmid key classid devids)], @_);
1059
1060    my $dbh = $self->dbh;
1061    my $fid = $arg{fid};
1062
1063    my $explicit_fid_used = $fid ? 1 : 0;
1064
1065    # setup the new mapping.  we store the devices that we picked for
1066    # this file in here, knowing that they might not be used.  create_close
1067    # is responsible for actually mapping in file_on.  NOTE: fid is being
1068    # passed in, it's either some number they gave us, or it's going to be
1069    # 0/undef which translates into NULL which means to automatically create
1070    # one.  that should be fine.
1071    my $ins_tempfile = sub {
1072        my $rv = eval {
1073            # We must only pass the correct number of bind parameters
1074            # Using 'NULL' for the AUTO_INCREMENT/SERIAL column will fail on
1075            # Postgres, where you are expected to leave it out or use DEFAULT
1076            # Leaving it out seems sanest and least likely to cause problems
1077            # with other databases.
1078            my @keys = ('dmid', 'dkey', 'classid', 'devids', 'createtime');
1079            my @vars = ('?'   , '?'   , '?'      , '?'     , $self->unix_timestamp);
1080            my @vals = ($arg{dmid}, $arg{key}, $arg{classid} || 0, $arg{devids});
1081            # Do not check for $explicit_fid_used, but rather $fid directly
1082            # as this anonymous sub is called from the loop later
1083            if($fid) {
1084                unshift @keys, 'fid';
1085                unshift @vars, '?';
1086                unshift @vals, $fid;
1087            }
1088            my $sql = "INSERT INTO tempfile (".join(',',@keys).") VALUES (".join(',',@vars).")";
1089            $dbh->do($sql, undef, @vals);
1090        };
1091        if (!$rv) {
1092            return undef if $self->was_duplicate_error;
1093            die "Unexpected db error into tempfile: " . $dbh->errstr;
1094        }
1095
1096        unless (defined $fid) {
1097            # if they did not give us a fid, then we want to grab the one that was
1098            # theoretically automatically generated
1099            $fid = $dbh->last_insert_id(undef, undef, 'tempfile', 'fid')
1100                or die "No last_insert_id found";
1101        }
1102        return undef unless defined $fid && $fid > 0;
1103        return 1;
1104    };
1105
1106    unless ($ins_tempfile->()) {
1107        throw("dup") if $explicit_fid_used;
1108        die "tempfile insert failed";
1109    }
1110
1111    my $fid_in_use = sub {
1112        my $exists = $dbh->selectrow_array("SELECT COUNT(*) FROM file WHERE fid=?", undef, $fid);
1113        return $exists ? 1 : 0;
1114    };
1115
1116    # See notes in MogileFS::Config->check_database
1117    my $min_fidid = MogileFS::Config->config('min_fidid');
1118
1119    # if the fid is in use, do something
1120    while ($fid_in_use->($fid) || $fid <= $min_fidid) {
1121        throw("dup") if $explicit_fid_used;
1122
1123        # be careful of databases which reset their
1124        # auto-increment/sequences when the table is empty (InnoDB
1125        # did/does this, for instance).  So check if it's in use, and
1126        # re-seed the table with the highest known fid from the file
1127        # table.
1128
1129        # get the highest fid from the filetable and insert a dummy row
1130        $fid = $dbh->selectrow_array("SELECT MAX(fid) FROM file");
1131        $ins_tempfile->();  # don't care about its result
1132
1133        # then do a normal auto-increment
1134        $fid = undef;
1135        $ins_tempfile->() or die "register_tempfile failed after seeding";
1136    }
1137
1138    return $fid;
1139}
1140
1141# return hashref of row containing columns "fid, dmid, dkey, length,
1142# classid, devcount" provided a $dmid and $key (dkey).  or undef if no
1143# row.
1144sub file_row_from_dmid_key {
1145    my ($self, $dmid, $key) = @_;
1146    return $self->dbh->selectrow_hashref("SELECT fid, dmid, dkey, length, classid, devcount ".
1147                                         "FROM file WHERE dmid=? AND dkey=?",
1148                                         undef, $dmid, $key);
1149}
1150
1151# return hashref of row containing columns "fid, dmid, dkey, length,
1152# classid, devcount" provided a $fidid or undef if no row.
1153sub file_row_from_fidid {
1154    my ($self, $fidid) = @_;
1155    return $self->dbh->selectrow_hashref("SELECT fid, dmid, dkey, length, classid, devcount ".
1156                                         "FROM file WHERE fid=?",
1157                                         undef, $fidid);
1158}
1159
1160# return an arrayref of rows containing columns "fid, dmid, dkey, length,
1161# classid, devcount" provided a pair of $fidid or undef if no rows.
1162sub file_row_from_fidid_range {
1163    my ($self, $fromfid, $count) = @_;
1164    my $sth = $self->dbh->prepare("SELECT fid, dmid, dkey, length, classid, devcount ".
1165                                  "FROM file WHERE fid > ? LIMIT ?");
1166    $sth->execute($fromfid,$count);
1167    return $sth->fetchall_arrayref({});
1168}
1169
1170# return array of devids that a fidid is on
1171sub fid_devids {
1172    my ($self, $fidid) = @_;
1173    return @{ $self->dbh->selectcol_arrayref("SELECT devid FROM file_on WHERE fid=?",
1174                                             undef, $fidid) || [] };
1175}
1176
1177# return hashref of { $fidid => [ $devid, $devid... ] } for a bunch of given @fidids
1178sub fid_devids_multiple {
1179    my ($self, @fidids) = @_;
1180    my $in = join(",", map { $_+0 } @fidids);
1181    my $ret = {};
1182    my $sth = $self->dbh->prepare("SELECT fid, devid FROM file_on WHERE fid IN ($in)");
1183    $sth->execute;
1184    while (my ($fidid, $devid) = $sth->fetchrow_array) {
1185        push @{$ret->{$fidid} ||= []}, $devid;
1186    }
1187    return $ret;
1188}
1189
1190# return hashref of columns classid, dmid, dkey, given a $fidid, or return undef
1191sub tempfile_row_from_fid {
1192    my ($self, $fidid) = @_;
1193    return $self->dbh->selectrow_hashref("SELECT classid, dmid, dkey, devids ".
1194                                         "FROM tempfile WHERE fid=?",
1195                                         undef, $fidid);
1196}
1197
1198# return 1 on success, throw "dup" on duplicate devid or throws other error on failure
1199sub create_device {
1200    my ($self, $devid, $hostid, $status) = @_;
1201    my $rv = $self->conddup(sub {
1202        $self->dbh->do("INSERT INTO device (devid, hostid, status) VALUES (?,?,?)", undef,
1203                       $devid, $hostid, $status);
1204    });
1205    $self->condthrow;
1206    die "error making device $devid\n" unless $rv > 0;
1207    return 1;
1208}
1209
1210sub update_device {
1211    my ($self, $devid, $to_update) = @_;
1212    my @keys = sort keys %$to_update;
1213    return unless @keys;
1214    $self->conddup(sub {
1215        $self->dbh->do("UPDATE device SET " . join('=?, ', @keys)
1216            . "=? WHERE devid=?", undef, (map { $to_update->{$_} } @keys),
1217            $devid);
1218    });
1219    return 1;
1220}
1221
1222sub update_device_usage {
1223    my $self = shift;
1224    my %arg = $self->_valid_params([qw(mb_total mb_used devid mb_asof)], @_);
1225    eval {
1226        $self->dbh->do("UPDATE device SET ".
1227                       "mb_total = ?, mb_used = ?, mb_asof = ?" .
1228                       " WHERE devid = ?",
1229                       undef, $arg{mb_total}, $arg{mb_used}, $arg{mb_asof},
1230                       $arg{devid});
1231    };
1232    $self->condthrow;
1233}
1234
1235# MySQL has an optimized version
1236sub update_device_usages {
1237    my ($self, $updates, $cb) = @_;
1238    foreach my $upd (@$updates) {
1239        $self->update_device_usage(%$upd);
1240        $cb->();
1241    }
1242}
1243
1244# This is unimplemented at the moment as we must verify:
1245# - no file_on rows exist
1246# - nothing in file_to_queue is going to attempt to use it
1247# - nothing in file_to_replicate is going to attempt to use it
1248# - it's already been marked dead
1249# - that all trackers are likely to know this :/
1250# - ensure the devid can't be reused
1251# IE; the user can't mark it dead then remove it all at once and cause their
1252# cluster to implode.
1253sub delete_device {
1254    die "Unimplemented; needs further testing";
1255}
1256
1257sub set_device_weight {
1258    my ($self, $devid, $weight) = @_;
1259    eval {
1260        $self->dbh->do('UPDATE device SET weight = ? WHERE devid = ?', undef, $weight, $devid);
1261    };
1262    $self->condthrow;
1263}
1264
1265sub set_device_state {
1266    my ($self, $devid, $state) = @_;
1267    eval {
1268        $self->dbh->do('UPDATE device SET status = ? WHERE devid = ?', undef, $state, $devid);
1269    };
1270    $self->condthrow;
1271}
1272
1273sub delete_class {
1274    my ($self, $dmid, $cid) = @_;
1275    throw("has_files") if $self->class_has_files($dmid, $cid);
1276    eval {
1277        $self->dbh->do("DELETE FROM class WHERE dmid = ? AND classid = ?", undef, $dmid, $cid);
1278    };
1279    $self->condthrow;
1280}
1281
1282# called from a queryworker process, will trigger delete_fidid_enqueued
1283# in the delete worker
1284sub delete_fidid {
1285    my ($self, $fidid) = @_;
1286    eval { $self->dbh->do("DELETE FROM file WHERE fid=?", undef, $fidid); };
1287    $self->condthrow;
1288    $self->enqueue_for_delete2($fidid, 0);
1289    $self->condthrow;
1290}
1291
1292# Only called from delete workers (after delete_fidid),
1293# this reduces client-visible latency from the queryworker
1294sub delete_fidid_enqueued {
1295    my ($self, $fidid) = @_;
1296    eval { $self->delete_checksum($fidid); };
1297    $self->condthrow;
1298    eval { $self->dbh->do("DELETE FROM tempfile WHERE fid=?", undef, $fidid); };
1299    $self->condthrow;
1300}
1301
1302sub delete_tempfile_row {
1303    my ($self, $fidid) = @_;
1304    my $rv = eval { $self->dbh->do("DELETE FROM tempfile WHERE fid=?", undef, $fidid); };
1305    $self->condthrow;
1306    return $rv;
1307}
1308
1309# Load the specified tempfile, then delete it.  If we succeed, we were
1310# here first; otherwise, someone else beat us here (and we return undef)
1311sub delete_and_return_tempfile_row {
1312    my ($self, $fidid) = @_;
1313    my $rv = $self->tempfile_row_from_fid($fidid);
1314    my $rows_deleted = $self->delete_tempfile_row($fidid);
1315    return $rv if ($rows_deleted > 0);
1316}
1317
1318sub replace_into_file {
1319    my $self = shift;
1320    my %arg  = $self->_valid_params([qw(fidid dmid key length classid devcount)], @_);
1321    die "Your database does not support REPLACE! Reimplement replace_into_file!" unless $self->can_replace;
1322    eval {
1323        $self->dbh->do("REPLACE INTO file (fid, dmid, dkey, length, classid, devcount) ".
1324                       "VALUES (?,?,?,?,?,?) ", undef,
1325                       @arg{'fidid', 'dmid', 'key', 'length', 'classid', 'devcount'});
1326    };
1327    $self->condthrow;
1328}
1329
1330# returns 1 on success, 0 on duplicate key error, dies on exception
1331# TODO: need a test to hit the duplicate name error condition
1332# TODO: switch to using "dup" exception here?
1333sub rename_file {
1334    my ($self, $fidid, $to_key) = @_;
1335    my $dbh = $self->dbh;
1336    eval {
1337        $dbh->do('UPDATE file SET dkey = ? WHERE fid=?',
1338                 undef, $to_key, $fidid);
1339    };
1340    if ($@ || $dbh->err) {
1341        # first is MySQL's error code for duplicates
1342        if ($self->was_duplicate_error) {
1343            return 0;
1344        } else {
1345            die $@;
1346        }
1347    }
1348    $self->condthrow;
1349    return 1;
1350}
1351
1352sub get_domainid_by_name {
1353    my $self = shift;
1354    my ($dmid) = $self->dbh->selectrow_array('SELECT dmid FROM domain WHERE namespace = ?',
1355        undef, $_[0]);
1356    return $dmid;
1357}
1358
1359# returns a hash of domains. Key is namespace, value is dmid.
1360sub get_all_domains {
1361    my ($self) = @_;
1362    my $domains = $self->dbh->selectall_arrayref('SELECT namespace, dmid FROM domain');
1363    return map { ($_->[0], $_->[1]) } @{$domains || []};
1364}
1365
1366sub get_classid_by_name {
1367    my $self = shift;
1368    my ($classid) = $self->dbh->selectrow_array('SELECT classid FROM class WHERE dmid = ? AND classname = ?',
1369        undef, $_[0], $_[1]);
1370    return $classid;
1371}
1372
1373# returns an array of hashrefs, one hashref per row in the 'class' table
1374sub get_all_classes {
1375    my ($self) = @_;
1376    my (@ret, $row);
1377
1378    my @cols = qw/dmid classid classname mindevcount/;
1379    if ($self->cached_schema_version >= 10) {
1380        push @cols, 'replpolicy';
1381        if ($self->cached_schema_version >= 15) {
1382            push @cols, 'hashtype';
1383        }
1384    }
1385    my $cols = join(', ', @cols);
1386    my $sth = $self->dbh->prepare("SELECT $cols FROM class");
1387    $sth->execute;
1388    push @ret, $row while $row = $sth->fetchrow_hashref;
1389    return @ret;
1390}
1391
1392# add a record of fidid existing on devid
1393# returns 1 on success, 0 on duplicate
1394sub add_fidid_to_devid {
1395    my ($self, $fidid, $devid) = @_;
1396    croak("fidid not non-zero") unless $fidid;
1397    croak("devid not non-zero") unless $devid;
1398
1399    # TODO: This should possibly be insert_ignore instead
1400    # As if we are adding an extra file_on entry, we do not want to replace the
1401    # exist one. Check REPLACE semantics.
1402    my $rv = $self->dowell($self->ignore_replace . " INTO file_on (fid, devid) VALUES (?,?)",
1403                           undef, $fidid, $devid);
1404    return 1 if $rv > 0;
1405    return 0;
1406}
1407
1408# remove a record of fidid existing on devid
1409# returns 1 on success, 0 if not there anyway
1410sub remove_fidid_from_devid {
1411    my ($self, $fidid, $devid) = @_;
1412    my $rv = eval { $self->dbh->do("DELETE FROM file_on WHERE fid=? AND devid=?",
1413                            undef, $fidid, $devid); };
1414    $self->condthrow;
1415    return $rv;
1416}
1417
1418# Test if host exists.
1419sub get_hostid_by_id {
1420    my $self = shift;
1421    my ($hostid) = $self->dbh->selectrow_array('SELECT hostid FROM host WHERE hostid = ?',
1422        undef, $_[0]);
1423    return $hostid;
1424}
1425
1426sub get_hostid_by_name {
1427    my $self = shift;
1428    my ($hostid) = $self->dbh->selectrow_array('SELECT hostid FROM host WHERE hostname = ?',
1429        undef, $_[0]);
1430    return $hostid;
1431}
1432
1433# get all hosts from database, returns them as list of hashrefs, hashrefs being the row contents.
1434sub get_all_hosts {
1435    my ($self) = @_;
1436    my $sth = $self->dbh->prepare("SELECT /*!40000 SQL_CACHE */ hostid, status, hostname, " .
1437                                  "hostip, http_port, http_get_port, altip, altmask FROM host");
1438    $sth->execute;
1439    my @ret;
1440    while (my $row = $sth->fetchrow_hashref) {
1441        push @ret, $row;
1442    }
1443    return @ret;
1444}
1445
1446# get all devices from database, returns them as list of hashrefs, hashrefs being the row contents.
1447sub get_all_devices {
1448    my ($self) = @_;
1449    my $sth = $self->dbh->prepare("SELECT /*!40000 SQL_CACHE */ devid, hostid, mb_total, " .
1450                                  "mb_used, mb_asof, status, weight FROM device");
1451    $self->condthrow;
1452    $sth->execute;
1453    my @return;
1454    while (my $row = $sth->fetchrow_hashref) {
1455        push @return, $row;
1456    }
1457    return @return;
1458}
1459
1460# update the device count for a given fidid
1461sub update_devcount {
1462    my ($self, $fidid) = @_;
1463    my $dbh = $self->dbh;
1464    my $ct = $dbh->selectrow_array("SELECT COUNT(*) FROM file_on WHERE fid=?",
1465                                   undef, $fidid);
1466
1467    eval { $dbh->do("UPDATE file SET devcount=? WHERE fid=?", undef,
1468              $ct, $fidid); };
1469    $self->condthrow;
1470
1471    return 1;
1472}
1473
1474# update the classid for a given fidid
1475sub update_classid {
1476    my ($self, $fidid, $classid) = @_;
1477    my $dbh = $self->dbh;
1478
1479    $dbh->do("UPDATE file SET classid=? WHERE fid=?", undef,
1480              $classid, $fidid);
1481
1482    $self->condthrow;
1483    return 1;
1484}
1485
1486# enqueue a fidid for replication, from a specific deviceid (can be undef), in a given number of seconds.
1487sub enqueue_for_replication {
1488    my ($self, $fidid, $from_devid, $in) = @_;
1489
1490    my $nexttry = 0;
1491    if ($in) {
1492        $nexttry = $self->unix_timestamp . " + " . int($in);
1493    }
1494
1495    $self->retry_on_deadlock(sub {
1496        $self->insert_ignore("INTO file_to_replicate (fid, fromdevid, nexttry) ".
1497                             "VALUES (?,?,$nexttry)", undef, $fidid, $from_devid);
1498    });
1499}
1500
1501# enqueue a fidid for delete
1502# note: if we get one more "independent" queue like this, the
1503# code should be collapsable? I tried once and it looked too ugly, so we have
1504# some redundancy.
1505sub enqueue_for_delete2 {
1506    my ($self, $fidid, $in) = @_;
1507
1508    $in = 0 unless $in;
1509    my $nexttry = $self->unix_timestamp . " + " . int($in);
1510
1511    $self->retry_on_deadlock(sub {
1512        $self->insert_ignore("INTO file_to_delete2 (fid, nexttry) ".
1513                             "VALUES (?,$nexttry)", undef, $fidid);
1514    });
1515}
1516
1517# enqueue a fidid for work
1518sub enqueue_for_todo {
1519    my ($self, $fidid, $type, $in) = @_;
1520
1521    $in = 0 unless $in;
1522    my $nexttry = $self->unix_timestamp . " + " . int($in);
1523
1524    $self->retry_on_deadlock(sub {
1525        if (ref($fidid)) {
1526            $self->insert_ignore("INTO file_to_queue (fid, devid, arg, type, ".
1527                                 "nexttry) VALUES (?,?,?,?,$nexttry)", undef,
1528                                 $fidid->[0], $fidid->[1], $fidid->[2], $type);
1529        } else {
1530            $self->insert_ignore("INTO file_to_queue (fid, type, nexttry) ".
1531                                 "VALUES (?,?,$nexttry)", undef, $fidid, $type);
1532        }
1533    });
1534}
1535
1536# return 1 on success.  die otherwise.
1537sub enqueue_many_for_todo {
1538    my ($self, $fidids, $type, $in) = @_;
1539    if (! ($self->can_insert_multi && ($self->can_replace || $self->can_insertignore))) {
1540        $self->enqueue_for_todo($_, $type, $in) foreach @$fidids;
1541        return 1;
1542    }
1543
1544    $in = 0 unless $in;
1545    my $nexttry = $self->unix_timestamp . " + " . int($in);
1546
1547    # TODO: convert to prepared statement?
1548    $self->retry_on_deadlock(sub {
1549        if (ref($fidids->[0]) eq 'ARRAY') {
1550            my $sql =  $self->ignore_replace .
1551                "INTO file_to_queue (fid, devid, arg, type, nexttry) VALUES ".
1552                join(', ', ('(?,?,?,?,?)') x scalar @$fidids);
1553            $self->dbh->do($sql, undef, map { @$_, $type, $nexttry } @$fidids);
1554        } else {
1555            $self->dbh->do($self->ignore_replace . " INTO file_to_queue (fid, type,
1556            nexttry) VALUES " .
1557            join(",", map { "(" . int($_) . ", $type, $nexttry)" } @$fidids));
1558        }
1559    });
1560    $self->condthrow;
1561}
1562
1563# For file_to_queue queues that should be kept small, find the size.
1564# This isn't fast, but for small queues won't be slow, and is usually only ran
1565# from a single tracker.
1566sub file_queue_length {
1567    my $self = shift;
1568    my $type = shift;
1569
1570    return $self->dbh->selectrow_array("SELECT COUNT(*) FROM file_to_queue " .
1571           "WHERE type = ?", undef, $type);
1572}
1573
1574# reschedule all deferred replication, return number rescheduled
1575sub replicate_now {
1576    my ($self) = @_;
1577
1578    $self->retry_on_deadlock(sub {
1579        return $self->dbh->do("UPDATE file_to_replicate SET nexttry = " . $self->unix_timestamp .
1580                              " WHERE nexttry > " . $self->unix_timestamp);
1581    });
1582}
1583
1584# takes two arguments, devid and limit, both required. returns an arrayref of fidids.
1585sub get_fidids_by_device {
1586    my ($self, $devid, $limit) = @_;
1587
1588    my $dbh = $self->dbh;
1589    my $fidids = $dbh->selectcol_arrayref("SELECT fid FROM file_on WHERE devid = ? LIMIT $limit",
1590                                          undef, $devid);
1591    return $fidids;
1592}
1593
1594# finds a chunk of fids given a set of constraints:
1595# devid, fidid, age (new or old), limit
1596# Note that if this function is very slow on your large DB, you're likely
1597# sorting by "newfiles" and are missing a new index.
1598# returns an arrayref of fidids
1599sub get_fidid_chunks_by_device {
1600    my ($self, %o) = @_;
1601
1602    my $dbh = $self->dbh;
1603    my $devid = delete $o{devid};
1604    croak("must supply at least a devid") unless $devid;
1605    my $age   = delete $o{age};
1606    my $fidid = delete $o{fidid};
1607    my $limit = delete $o{limit};
1608    croak("invalid options: " . join(', ', keys %o)) if %o;
1609    # If supplied a "previous" fidid, we're paging through.
1610    my $fidsort = '';
1611    my $order   = '';
1612    $age ||= 'old';
1613    if ($age eq 'old') {
1614        $fidsort = 'AND fid > ?' if $fidid;
1615        $order   = 'ASC';
1616    } elsif ($age eq 'new') {
1617        $fidsort = 'AND fid < ?' if $fidid;
1618        $order   = 'DESC';
1619    } else {
1620        croak("invalid age argument: " . $age);
1621    }
1622    $limit ||= 100;
1623    my @extra = ();
1624    push @extra, $fidid if $fidid;
1625
1626    my $fidids = $dbh->selectcol_arrayref("SELECT fid FROM file_on WHERE devid = ? " .
1627        $fidsort . " ORDER BY fid $order LIMIT $limit", undef, $devid, @extra);
1628    return $fidids;
1629}
1630
1631# gets fidids above fidid_low up to (and including) fidid_high
1632sub get_fidids_between {
1633    my ($self, $fidid_low, $fidid_high, $limit) = @_;
1634    $limit ||= 1000;
1635    $limit = int($limit);
1636
1637    my $dbh = $self->dbh;
1638    my $fidids = $dbh->selectcol_arrayref(qq{SELECT fid FROM file
1639        WHERE fid > ? and fid <= ?
1640        ORDER BY fid LIMIT $limit}, undef, $fidid_low, $fidid_high);
1641    return $fidids;
1642}
1643
1644# creates a new domain, given a domain namespace string.  return the dmid on success,
1645# throw 'dup' on duplicate name.
1646# override if you want a less racy version.
1647sub create_domain {
1648    my ($self, $name) = @_;
1649    my $dbh = $self->dbh;
1650
1651    # get the max domain id
1652    my $maxid = $dbh->selectrow_array('SELECT MAX(dmid) FROM domain') || 0;
1653    my $rv = eval {
1654        $dbh->do('INSERT INTO domain (dmid, namespace) VALUES (?, ?)',
1655                 undef, $maxid + 1, $name);
1656    };
1657    if ($self->was_duplicate_error) {
1658        throw("dup");
1659    }
1660    return $maxid+1 if $rv;
1661    die "failed to make domain";  # FIXME: the above is racy.
1662}
1663
1664sub update_host {
1665    my ($self, $hid, $to_update) = @_;
1666    my @keys = sort keys %$to_update;
1667    return unless @keys;
1668    $self->conddup(sub {
1669        $self->dbh->do("UPDATE host SET " . join('=?, ', @keys)
1670            . "=? WHERE hostid=?", undef, (map { $to_update->{$_} } @keys),
1671            $hid);
1672    });
1673    return 1;
1674}
1675
1676# return ne hostid, or throw 'dup' on error.
1677# NOTE: you need to put them into the initial 'down' state.
1678sub create_host {
1679    my ($self, $hostname, $ip) = @_;
1680    my $dbh = $self->dbh;
1681    # racy! lazy. no, better: portable! how often does this happen? :)
1682    my $hid = ($dbh->selectrow_array('SELECT MAX(hostid) FROM host') || 0) + 1;
1683    my $rv = $self->conddup(sub {
1684        $dbh->do("INSERT INTO host (hostid, hostname, hostip, status) ".
1685                 "VALUES (?, ?, ?, 'down')",
1686                 undef, $hid, $hostname, $ip);
1687    });
1688    return $hid if $rv;
1689    die "db failure";
1690}
1691
1692# return array of row hashrefs containing columns: (fid, fromdevid,
1693# failcount, flags, nexttry)
1694sub files_to_replicate {
1695    my ($self, $limit) = @_;
1696    my $ut = $self->unix_timestamp;
1697    my $to_repl_map = $self->dbh->selectall_hashref(qq{
1698        SELECT fid, fromdevid, failcount, flags, nexttry
1699        FROM file_to_replicate
1700        WHERE nexttry <= $ut
1701        ORDER BY nexttry
1702        LIMIT $limit
1703    }, "fid") or return ();
1704    return values %$to_repl_map;
1705}
1706
1707# "new" style queue consumption code.
1708# from within a transaction, fetch a limit of fids,
1709# then update each fid's nexttry to be off in the future,
1710# giving local workers some time to dequeue the items.
1711# Note:
1712# DBI (even with RaiseError) returns weird errors on
1713# deadlocks from selectall_hashref. So we can't do that.
1714# we also used to retry on deadlock within the routine,
1715# but instead lets return undef and let job_master retry.
1716sub grab_queue_chunk {
1717    my $self      = shift;
1718    my $queue     = shift;
1719    my $limit     = shift;
1720    my $extfields = shift;
1721
1722    my $dbh = $self->dbh;
1723    my $tries = 3;
1724    my $work;
1725
1726    return 0 unless $self->lock_queue($queue);
1727
1728    my $extwhere = shift || '';
1729    my $fields = 'fid, nexttry, failcount';
1730    $fields .= ', ' . $extfields if $extfields;
1731    eval {
1732        $dbh->begin_work;
1733        my $ut  = $self->unix_timestamp;
1734        my $query = qq{
1735            SELECT $fields
1736            FROM $queue
1737            WHERE nexttry <= $ut
1738            $extwhere
1739            ORDER BY nexttry
1740            LIMIT $limit
1741        };
1742        $query .= "FOR UPDATE\n" if $self->can_for_update;
1743        my $sth = $dbh->prepare($query);
1744        $sth->execute;
1745        $work = $sth->fetchall_hashref('fid');
1746        # Nothing to work on.
1747        # Now claim the fids for a while.
1748        # TODO: Should be configurable... but not necessary.
1749        my $fidlist = join(',', keys %$work);
1750        unless ($fidlist) { $dbh->commit; return; }
1751        $dbh->do("UPDATE $queue SET nexttry = $ut + 1000 WHERE fid IN ($fidlist)");
1752        $dbh->commit;
1753    };
1754    if ($self->was_deadlock_error) {
1755        eval { $dbh->rollback };
1756        $work = undef;
1757    } else {
1758        $self->condthrow;
1759    }
1760    # FIXME: Super extra paranoia to prevent deadlocking.
1761    # Need to handle or die on all errors above, but $@ can get reset. For now
1762    # we'll just always ensure there's no transaction running at the end here.
1763    # A (near) release should figure the error detection correctly.
1764    if ($dbh->{AutoCommit} == 0) { eval { $dbh->rollback }; }
1765    $self->unlock_queue($queue);
1766
1767    return defined $work ? values %$work : ();
1768}
1769
1770sub grab_files_to_replicate {
1771    my ($self, $limit) = @_;
1772    return $self->grab_queue_chunk('file_to_replicate', $limit,
1773        'fromdevid, flags');
1774}
1775
1776sub grab_files_to_delete2 {
1777    my ($self, $limit) = @_;
1778    return $self->grab_queue_chunk('file_to_delete2', $limit);
1779}
1780
1781# $extwhere is ugly... but should be fine.
1782sub grab_files_to_queued {
1783    my ($self, $type, $what, $limit) = @_;
1784    $what ||= 'type, flags';
1785    return $self->grab_queue_chunk('file_to_queue', $limit,
1786        $what, 'AND type = ' . $type);
1787}
1788
1789# although it's safe to have multiple tracker hosts and/or processes
1790# replicating the same file, around, it's inefficient CPU/time-wise,
1791# and it's also possible they pick different places and waste disk.
1792# so the replicator asks the store interface when it's about to start
1793# and when it's done replicating a fidid, so you can do something smart
1794# and tell it not to.
1795sub should_begin_replicating_fidid {
1796    my ($self, $fidid) = @_;
1797    my $lockname = "mgfs:fid:$fidid:replicate";
1798    return 1 if $self->get_lock($lockname, 1);
1799    return 0;
1800}
1801
1802# called when replicator is done replicating a fid, so you can cleanup
1803# whatever you did in 'should_begin_replicating_fidid' above.
1804#
1805# NOTE: there's a theoretical race condition in the rebalance code,
1806# where (without locking as provided by
1807# should_begin_replicating_fidid/note_done_replicating), all copies of
1808# a file can be deleted by independent replicators doing rebalancing
1809# in different ways.  so you'll probably want to implement some
1810# locking in this pair of functions.
1811sub note_done_replicating {
1812    my ($self, $fidid) = @_;
1813    my $lockname = "mgfs:fid:$fidid:replicate";
1814    $self->release_lock($lockname);
1815}
1816
1817sub find_fid_from_file_to_replicate {
1818    my ($self, $fidid) = @_;
1819    return $self->dbh->selectrow_hashref("SELECT fid, nexttry, fromdevid, failcount, flags FROM file_to_replicate WHERE fid = ?",
1820        undef, $fidid);
1821}
1822
1823sub find_fid_from_file_to_delete2 {
1824    my ($self, $fidid) = @_;
1825    return $self->dbh->selectrow_hashref("SELECT fid, nexttry, failcount FROM file_to_delete2 WHERE fid = ?",
1826        undef, $fidid);
1827}
1828
1829sub find_fid_from_file_to_queue {
1830    my ($self, $fidid, $type) = @_;
1831    return $self->dbh->selectrow_hashref("SELECT fid, devid, type, nexttry, failcount, flags, arg FROM file_to_queue WHERE fid = ? AND type = ?",
1832        undef, $fidid, $type);
1833}
1834
1835sub delete_fid_from_file_to_replicate {
1836    my ($self, $fidid) = @_;
1837    $self->retry_on_deadlock(sub {
1838        $self->dbh->do("DELETE FROM file_to_replicate WHERE fid=?", undef, $fidid);
1839    });
1840}
1841
1842sub delete_fid_from_file_to_queue {
1843    my ($self, $fidid, $type) = @_;
1844    $self->retry_on_deadlock(sub {
1845        $self->dbh->do("DELETE FROM file_to_queue WHERE fid=? and type=?",
1846            undef, $fidid, $type);
1847    });
1848}
1849
1850sub delete_fid_from_file_to_delete2 {
1851    my ($self, $fidid) = @_;
1852    $self->retry_on_deadlock(sub {
1853        $self->dbh->do("DELETE FROM file_to_delete2 WHERE fid=?", undef, $fidid);
1854    });
1855}
1856
1857sub reschedule_file_to_replicate_absolute {
1858    my ($self, $fid, $abstime) = @_;
1859    $self->retry_on_deadlock(sub {
1860        $self->dbh->do("UPDATE file_to_replicate SET nexttry = ?, failcount = failcount + 1 WHERE fid = ?",
1861                       undef, $abstime, $fid);
1862    });
1863}
1864
1865sub reschedule_file_to_replicate_relative {
1866    my ($self, $fid, $in_n_secs) = @_;
1867    $self->retry_on_deadlock(sub {
1868        $self->dbh->do("UPDATE file_to_replicate SET nexttry = " . $self->unix_timestamp . " + ?, " .
1869                       "failcount = failcount + 1 WHERE fid = ?",
1870                       undef, $in_n_secs, $fid);
1871    });
1872}
1873
1874sub reschedule_file_to_delete2_absolute {
1875    my ($self, $fid, $abstime) = @_;
1876    $self->retry_on_deadlock(sub {
1877        $self->dbh->do("UPDATE file_to_delete2 SET nexttry = ?, failcount = failcount + 1 WHERE fid = ?",
1878                       undef, $abstime, $fid);
1879    });
1880}
1881
1882sub reschedule_file_to_delete2_relative {
1883    my ($self, $fid, $in_n_secs) = @_;
1884    $self->retry_on_deadlock(sub {
1885        $self->dbh->do("UPDATE file_to_delete2 SET nexttry = " . $self->unix_timestamp . " + ?, " .
1886                       "failcount = failcount + 1 WHERE fid = ?",
1887                       undef, $in_n_secs, $fid);
1888    });
1889}
1890
1891# Given a dmid prefix after and limit, return an arrayref of dkey from the file
1892# table.
1893sub get_keys_like {
1894    my ($self, $dmid, $prefix, $after, $limit) = @_;
1895    # fix the input... prefix always ends with a % so that it works
1896    # in a LIKE call, and after is either blank or something
1897    $prefix = '' unless defined $prefix;
1898
1899    # escape underscores, % and \
1900    $prefix =~ s/([%\\_])/\\$1/g;
1901
1902    $prefix .= '%';
1903    $after  = '' unless defined $after;
1904
1905    my $like = $self->get_keys_like_operator;
1906
1907    # now select out our keys
1908    return $self->dbh->selectcol_arrayref
1909        ("SELECT dkey FROM file WHERE dmid = ? AND dkey $like ? ESCAPE ? AND dkey > ? " .
1910         "ORDER BY dkey LIMIT $limit", undef, $dmid, $prefix, "\\", $after);
1911}
1912
1913sub get_keys_like_operator { return "LIKE"; }
1914
1915# return arrayref of all tempfile rows (themselves also arrayrefs, of [$fidid, $devids])
1916# that were created $secs_ago seconds ago or older.
1917sub old_tempfiles {
1918    my ($self, $secs_old) = @_;
1919    return $self->dbh->selectall_arrayref("SELECT fid, devids FROM tempfile " .
1920                                          "WHERE createtime < " . $self->unix_timestamp . " - $secs_old LIMIT 50");
1921}
1922
1923# given an array of MogileFS::DevFID objects, mass-insert them all
1924# into file_on (ignoring if they're already present)
1925sub mass_insert_file_on {
1926    my ($self, @devfids) = @_;
1927    return 1 unless @devfids;
1928
1929    if (@devfids > 1 && ! $self->can_insert_multi) {
1930        $self->mass_insert_file_on($_) foreach @devfids;
1931        return 1;
1932    }
1933
1934    my (@qmarks, @binds);
1935    foreach my $df (@devfids) {
1936        my ($fidid, $devid) = ($df->fidid, $df->devid);
1937        Carp::croak("got a false fidid") unless $fidid;
1938        Carp::croak("got a false devid") unless $devid;
1939        push @binds, $fidid, $devid;
1940        push @qmarks, "(?,?)";
1941    }
1942
1943    # TODO: This should possibly be insert_ignore instead
1944    # As if we are adding an extra file_on entry, we do not want to replace the
1945    # exist one. Check REPLACE semantics.
1946    $self->dowell($self->ignore_replace . " INTO file_on (fid, devid) VALUES " . join(',', @qmarks), undef, @binds);
1947    return 1;
1948}
1949
1950sub set_schema_vesion {
1951    my ($self, $ver) = @_;
1952    $self->set_server_setting("schema_version", int($ver));
1953}
1954
1955# returns array of fidids to try and delete again
1956sub fids_to_delete_again {
1957    my $self = shift;
1958    my $ut = $self->unix_timestamp;
1959    return @{ $self->dbh->selectcol_arrayref(qq{
1960        SELECT fid
1961         FROM file_to_delete_later
1962        WHERE delafter < $ut
1963        LIMIT 500
1964    }) || [] };
1965}
1966
1967# return 1 on success.  die otherwise.
1968sub enqueue_fids_to_delete {
1969    my ($self, @fidids) = @_;
1970    # multi-row insert-ignore/replace CAN fail with the insert_ignore emulation sub.
1971    # when the first row causes the duplicate error, and the remaining rows are
1972    # not processed.
1973    if (@fidids > 1 && ! ($self->can_insert_multi && ($self->can_replace || $self->can_insertignore))) {
1974        $self->enqueue_fids_to_delete($_) foreach @fidids;
1975        return 1;
1976    }
1977    # TODO: convert to prepared statement?
1978    $self->retry_on_deadlock(sub {
1979        $self->dbh->do($self->ignore_replace . " INTO file_to_delete (fid) VALUES " .
1980                       join(",", map { "(" . int($_) . ")" } @fidids));
1981    });
1982    $self->condthrow;
1983}
1984
1985sub enqueue_fids_to_delete2 {
1986    my ($self, @fidids) = @_;
1987    # multi-row insert-ignore/replace CAN fail with the insert_ignore emulation sub.
1988    # when the first row causes the duplicate error, and the remaining rows are
1989    # not processed.
1990    if (@fidids > 1 && ! ($self->can_insert_multi && ($self->can_replace || $self->can_insertignore))) {
1991        $self->enqueue_fids_to_delete2($_) foreach @fidids;
1992        return 1;
1993    }
1994
1995    my $nexttry = $self->unix_timestamp;
1996
1997    # TODO: convert to prepared statement?
1998    $self->retry_on_deadlock(sub {
1999        $self->dbh->do($self->ignore_replace . " INTO file_to_delete2 (fid,
2000        nexttry) VALUES " .
2001                       join(",", map { "(" . int($_) . ", $nexttry)" } @fidids));
2002    });
2003    $self->condthrow;
2004}
2005
2006# clears everything from the fsck_log table
2007# return 1 on success.  die otherwise.
2008sub clear_fsck_log {
2009    my $self = shift;
2010    $self->dbh->do("DELETE FROM fsck_log");
2011    return 1;
2012}
2013
2014# FIXME: Fsck log entries are processed a little out of order.
2015# Once a fsck has completed, the log should be re-summarized.
2016sub fsck_log_summarize {
2017    my $self = shift;
2018
2019    my $lockname = 'mgfs:fscksum';
2020    my $lock = eval { $self->get_lock($lockname, 10) };
2021    return 0 if defined $lock && $lock == 0;
2022
2023    my $logid = $self->max_fsck_logid;
2024
2025    # sum-up evcode counts every so often, to make fsck_status faster,
2026    # avoiding a potentially-huge GROUP BY in the future..
2027    my $start_max_logid = $self->server_setting("fsck_start_maxlogid") || 0;
2028    # both inclusive:
2029    my $min_logid = $self->server_setting("fsck_logid_processed") || 0;
2030    $min_logid++;
2031    my $cts = $self->fsck_evcode_counts(logid_range => [$min_logid, $logid]); # inclusive notation :)
2032    while (my ($evcode, $ct) = each %$cts) {
2033        $self->incr_server_setting("fsck_sum_evcount_$evcode", $ct);
2034    }
2035    $self->set_server_setting("fsck_logid_processed", $logid);
2036
2037    $self->release_lock($lockname) if $lock;
2038}
2039
2040sub fsck_log {
2041    my ($self, %opts) = @_;
2042    $self->dbh->do("INSERT INTO fsck_log (utime, fid, evcode, devid) ".
2043                   "VALUES (" . $self->unix_timestamp . ",?,?,?)",
2044                   undef,
2045                   delete $opts{fid},
2046                   delete $opts{code},
2047                   delete $opts{devid});
2048    croak("Unknown opts") if %opts;
2049    $self->condthrow;
2050
2051    return 1;
2052}
2053
2054sub get_db_unixtime {
2055    my $self = shift;
2056    return $self->dbh->selectrow_array("SELECT " . $self->unix_timestamp);
2057}
2058
2059sub max_fidid {
2060    my $self = shift;
2061    return $self->dbh->selectrow_array("SELECT MAX(fid) FROM file");
2062}
2063
2064sub max_fsck_logid {
2065    my $self = shift;
2066    return $self->dbh->selectrow_array("SELECT MAX(logid) FROM fsck_log") || 0;
2067}
2068
2069# returns array of $row hashrefs, from fsck_log table
2070sub fsck_log_rows {
2071    my ($self, $after_logid, $limit) = @_;
2072    $limit       = int($limit || 100);
2073    $after_logid = int($after_logid || 0);
2074
2075    my @rows;
2076    my $sth = $self->dbh->prepare(qq{
2077        SELECT logid, utime, fid, evcode, devid
2078        FROM fsck_log
2079        WHERE logid > ?
2080        ORDER BY logid
2081        LIMIT $limit
2082    });
2083    $sth->execute($after_logid);
2084    my $row;
2085    push @rows, $row while $row = $sth->fetchrow_hashref;
2086    return @rows;
2087}
2088
2089sub fsck_evcode_counts {
2090    my ($self, %opts) = @_;
2091    my $timegte = delete $opts{time_gte};
2092    my $logr    = delete $opts{logid_range};
2093    die if %opts;
2094
2095    my $ret = {};
2096    my $sth;
2097    if ($timegte) {
2098        $sth = $self->dbh->prepare(qq{
2099            SELECT evcode, COUNT(*) FROM fsck_log
2100            WHERE utime >= ?
2101            GROUP BY evcode
2102         });
2103        $sth->execute($timegte||0);
2104    }
2105    if ($logr) {
2106        $sth = $self->dbh->prepare(qq{
2107            SELECT evcode, COUNT(*) FROM fsck_log
2108            WHERE logid >= ? AND logid <= ?
2109            GROUP BY evcode
2110         });
2111        $sth->execute($logr->[0], $logr->[1]);
2112    }
2113    while (my ($ev, $ct) = $sth->fetchrow_array) {
2114        $ret->{$ev} = $ct;
2115    }
2116    return $ret;
2117}
2118
2119# run before daemonizing.  you can die from here if you see something's amiss.  or emit
2120# warnings.
2121sub pre_daemonize_checks {
2122    my $self = shift;
2123
2124    $self->pre_daemonize_check_slaves;
2125}
2126
2127sub pre_daemonize_check_slaves {
2128    my $sk = MogileFS::Config->server_setting('slave_keys')
2129        or return;
2130
2131    my @slaves;
2132    foreach my $key (split /\s*,\s*/, $sk) {
2133        my $slave = MogileFS::Config->server_setting("slave_$key");
2134
2135        if (!$slave) {
2136            error("key for slave DB config: slave_$key not found in configuration");
2137            next;
2138        }
2139
2140        my ($dsn, $user, $pass) = split /\|/, $slave;
2141        if (!defined($dsn) or !defined($user) or !defined($pass)) {
2142            error("key slave_$key contains $slave, which doesn't split in | into DSN|user|pass - ignoring");
2143            next;
2144        }
2145        push @slaves, [$dsn, $user, $pass]
2146    }
2147
2148    return unless @slaves; # Escape this block if we don't have a set of slaves anyways
2149
2150    MogileFS::run_global_hook('slave_list_check', \@slaves);
2151}
2152
2153
2154# attempt to grab a lock of lockname, and timeout after timeout seconds.
2155# returns 1 on success and 0 on timeout.  dies if more than one lock is already outstanding.
2156sub get_lock {
2157    my ($self, $lockname, $timeout) = @_;
2158    die "Lock recursion detected (grabbing $lockname, had $self->{last_lock}).  Bailing out." if $self->{lock_depth};
2159    die "get_lock not implemented for $self";
2160}
2161
2162# attempt to release a lock of lockname.
2163# returns 1 on success and 0 if no lock we have has that name.
2164sub release_lock {
2165    my ($self, $lockname) = @_;
2166    die "release_lock not implemented for $self";
2167}
2168
2169# MySQL has an issue where you either get excessive deadlocks, or INSERT's
2170# hang forever around some transactions. Use ghetto locking to cope.
2171sub lock_queue { 1 }
2172sub unlock_queue { 1 }
2173
2174sub BLOB_BIND_TYPE { undef; }
2175
2176sub set_checksum {
2177    my ($self, $fidid, $hashtype, $checksum) = @_;
2178    my $dbh = $self->dbh;
2179    die "Your database does not support REPLACE! Reimplement set_checksum!" unless $self->can_replace;
2180
2181    eval {
2182        my $sth = $dbh->prepare("REPLACE INTO checksum " .
2183                                "(fid, hashtype, checksum) " .
2184                                "VALUES (?, ?, ?)");
2185        $sth->bind_param(1, $fidid);
2186        $sth->bind_param(2, $hashtype);
2187        $sth->bind_param(3, $checksum, BLOB_BIND_TYPE);
2188        $sth->execute;
2189    };
2190    $self->condthrow;
2191}
2192
2193sub get_checksum {
2194    my ($self, $fidid) = @_;
2195
2196    $self->dbh->selectrow_hashref("SELECT fid, hashtype, checksum " .
2197                                  "FROM checksum WHERE fid = ?",
2198                                  undef, $fidid);
2199}
2200
2201sub delete_checksum {
2202    my ($self, $fidid) = @_;
2203
2204    $self->dbh->do("DELETE FROM checksum WHERE fid = ?", undef, $fidid);
2205}
2206
2207# setup the value used in a 'nexttry' field to indicate that this item will
2208# never actually be tried again and require some sort of manual intervention.
2209use constant ENDOFTIME => 2147483647;
2210
2211sub end_of_time { ENDOFTIME; }
2212
2213# returns the size of the non-urgent replication queue
2214# nexttry == 0                        - the file is urgent
2215# nexttry != 0 && nexttry < ENDOFTIME - the file is deferred
2216sub deferred_repl_queue_length {
2217    my ($self) = @_;
2218
2219    return $self->dbh->selectrow_array('SELECT COUNT(*) FROM file_to_replicate WHERE nexttry != 0 AND nexttry < ?', undef, $self->end_of_time);
2220}
2221
22221;
2223
2224__END__
2225
2226=head1 NAME
2227
2228MogileFS::Store - data storage provider.  base class.
2229
2230=head1 ABOUT
2231
2232MogileFS aims to be database-independent (though currently as of late
22332006 only works with MySQL).  In the future, the server will create a
2234singleton instance of type "MogileFS::Store", like
2235L<MogileFS::Store::MySQL>, and all database interaction will be
2236through it.
2237
2238=head1 SEE ALSO
2239
2240L<MogileFS::Store::MySQL>
2241
2242
2243