1package MogileFS::Store::Postgres;
2# vim: ts=4 sw=4 et ft=perl:
3use strict;
4use Digest::MD5 qw(md5); # Used for lockid
5use DBI;
6use DBD::Pg;
7use Sys::Hostname;
8use MogileFS::Util qw(throw debug error);
9use MogileFS::Server;
10use Carp;
11use base 'MogileFS::Store';
12
13# --------------------------------------------------------------------------
14# Package methods we override
15# --------------------------------------------------------------------------
16
17sub dsn_of_dbhost {
18    my ($class, $dbname, $host, $port) = @_;
19    return "DBI:Pg:dbname=$dbname;host=$host" . ($port ? ";port=$port" : "");
20}
21
22sub dsn_of_root {
23    my ($class, $dbname, $host, $port) = @_;
24    return $class->dsn_of_dbhost('postgres', $host, $port);
25}
26
27# --------------------------------------------------------------------------
28# Store-related things we override
29# --------------------------------------------------------------------------
30
31sub want_raise_errors { 1 }
32
33# given a root DBI connection, create the named database.  succeed
34# if it it's made, or already exists.  die otherwise.
35sub create_db_if_not_exists {
36    my ($pkg, $rdbh, $dbname) = @_;
37    if(not $rdbh->do("CREATE DATABASE $dbname TEMPLATE template0 ENCODING 'UTF-8'" )) {
38        die "Failed to create database '$dbname': " . $rdbh->errstr . "\n" if ($rdbh->errstr !~ /already exists/);
39    }
40}
41
42sub grant_privileges {
43    my ($pkg, $rdbh, $dbname, $user, $pass) = @_;
44    eval {
45        $rdbh->do("CREATE ROLE $user LOGIN PASSWORD ?",
46            undef, $pass);
47    };
48    die "Failed to create user '$user': ". $rdbh->errstr . "\n"
49        if $rdbh->err && $rdbh->state != '42710';
50    # Owning the database is postgres is important
51    $rdbh->do("ALTER DATABASE $dbname OWNER TO $user")
52        or die "Failed to grant privileges " . $rdbh->errstr . "\n";
53}
54
55sub can_replace { 0 }
56sub can_insertignore { 0 }
57sub can_insert_multi { 0 }
58sub unix_timestamp { "EXTRACT(epoch FROM NOW())::int4" }
59
60sub init {
61    my $self = shift;
62    $self->SUPER::init;
63    my $database_version = $self->dbh->get_info(18); # SQL_DBMS_VER
64    # We need >=pg-8.2 because we use SAVEPOINT and ROLLBACK TO.
65    # We need >=pg-8.4 for working advisory locks
66    die "Postgres is too old! Must use >=postgresql-8.4!" if($database_version =~ /\A0[0-7]\.|08\.0[0123]/);
67    $self->{lock_depth} = 0;
68}
69
70sub post_dbi_connect {
71    my $self = shift;
72    $self->SUPER::post_dbi_connect;
73    $self->{lock_depth} = 0;
74}
75
76sub can_do_slaves { 0 }
77
78# TODO: Implement later
79#sub check_slave {
80#}
81
82sub was_deadlock_error {
83    my $self = shift;
84    my $dbh = $self->dbh;
85    return 0 unless $dbh->err;
86    return 1 if $dbh->state eq '40P01';
87}
88
89sub was_duplicate_error {
90    my $self = shift;
91    my $dbh = $self->dbh;
92    return 0 unless $dbh->err;
93    return 1 if $dbh->state eq '23505' || $dbh->errstr =~ /duplicate/i;
94}
95
96sub table_exists {
97    my ($self, $table) = @_;
98    return eval {
99        my $sth = $self->dbh->table_info(undef, undef, $table, "table");
100        my $rec = $sth->fetchrow_hashref;
101        return $rec ? 1 : 0;
102    };
103}
104
105sub setup_database {
106    my $self = shift;
107    $self->add_extra_tables('lock');
108    return $self->SUPER::setup_database;
109}
110
111sub filter_create_sql {
112    my ($self, $sql) = @_;
113    $sql =~ s/\bUNSIGNED\b//g;
114    $sql =~ s/\bVARBINARY\(\d+\)/bytea/g;
115    $sql =~ s/\b(?:TINY|MEDIUM)INT\b/SMALLINT/g;
116    $sql =~ s/\bINT\s+NOT\s+NULL\s+AUTO_INCREMENT\b/SERIAL/g;
117    $sql =~ s/# /-- /g;
118
119    my ($table) = $sql =~ /create\s+table\s+(\S+)/i;
120    die "didn't find table" unless $table;
121    my $index = sprintf 'INDEXES_%s', $table;
122    if ($self->can($index)) {
123        $sql =~ s!,\s*INDEX\s*(\w+)?\s*\(.+?\)!!mgi;
124    }
125
126    # Allow 64-bit ids for file IDs
127    $sql =~ s!\bfid\s+INT\b!fid BIGINT!i if $self->fid_type eq "BIGINT";
128
129    return $sql;
130}
131
132sub TABLE_file {
133    "CREATE TABLE file (
134    fid          INT NOT NULL,
135    PRIMARY KEY  (fid),
136
137    dmid         SMALLINT NOT NULL,
138    dkey         VARCHAR(255),      -- domain-defined
139    UNIQUE       (dmid, dkey),
140
141    length       BIGINT,            -- big limit
142    CHECK        (length >= 0),
143
144    classid      SMALLINT NOT NULL,
145    devcount     SMALLINT NOT NULL
146    )"
147}
148
149sub INDEXES_file {
150    "CREATE INDEX file_devcount ON file (dmid,classid,devcount)"
151}
152
153sub INDEXES_unreachable_fids {
154    "CREATE INDEX unreachable_fids_lastupdate ON unreachable_fids (lastupdate)"
155}
156
157sub INDEXES_file_on {
158    "CREATE INDEX file_on_devid ON file_on (devid)"
159}
160
161sub TABLE_host {
162    "CREATE TABLE host (
163    hostid          SMALLINT NOT NULL,
164    PRIMARY KEY     (hostid),
165    CHECK           (hostid >= 0),
166
167    status          VARCHAR(8),
168    CHECK           (status IN ('alive','dead','down')),
169
170    http_port       INT DEFAULT 7500,
171    CHECK           (http_port >= 0),
172    CHECK           (http_port < 65536),
173
174    http_get_port   INT,
175    CHECK           (http_get_port >= 0),
176    CHECK           (http_get_port < 65536),
177
178    hostname        VARCHAR(40),
179    UNIQUE          (hostname),
180    hostip          VARCHAR(15),
181    UNIQUE          (hostip),
182    altip           VARCHAR(15),
183    UNIQUE          (altip),
184    altmask         VARCHAR(18)
185    )"
186}
187
188sub TABLE_device {
189    "CREATE TABLE device (
190    devid       SMALLINT NOT NULL,
191    PRIMARY KEY (devid),
192    CHECK       (devid >= 0),
193
194    hostid      SMALLINT NOT NULL,
195
196    status      VARCHAR(8),
197    CHECK       (status IN ('alive','dead','down','readonly','drain')),
198    weight      INT DEFAULT 100,
199
200    mb_total    INT,
201    CHECK       (mb_total >= 0),
202    mb_used     INT,
203    CHECK       (mb_used >= 0),
204    mb_asof     INT
205    CHECK       (mb_asof >= 0)
206    )"
207}
208
209sub INDEXES_device {
210    "CREATE INDEX device_status ON device (status)"
211}
212
213sub INDEXES_file_to_replicate {
214    "CREATE INDEX file_to_replicate_nexttry ON file_to_replicate (nexttry)"
215}
216
217sub INDEXES_file_to_delete2 {
218    "CREATE INDEX file_to_delete2_nexttry ON file_to_delete2 (nexttry)"
219}
220
221sub INDEXES_file_to_delete_later {
222    "CREATE INDEX file_to_delete_later_delafter ON file_to_delete_later (delafter)"
223}
224
225sub INDEXES_fsck_log {
226    "CREATE INDEX fsck_log_utime ON fsck_log (utime)"
227}
228
229sub INDEXES_file_to_queue {
230    "CREATE INDEX type_nexttry ON file_to_queue (type,nexttry)"
231}
232
233# Extra table
234sub TABLE_lock {
235    "CREATE TABLE lock (
236    lockid      INT NOT NULL,
237    PRIMARY KEY (lockid),
238    CHECK       (lockid >= 0),
239
240    hostname    VARCHAR(255) NOT NULL,
241
242    pid         INT NOT NULL,
243    CHECK       (pid >= 0),
244
245    acquiredat  INT NOT NULL,
246    CHECK       (acquiredat >= 0)
247    )"
248}
249
250sub upgrade_add_host_getport {
251    my $self = shift;
252    # see if they have the get port, else update it
253    unless ($self->column_type("host", "http_get_port")) {
254        $self->dowell("ALTER TABLE host ADD COLUMN http_get_port INT CHECK(http_get_port >= 0)");
255    }
256
257}
258sub upgrade_add_host_altip {
259    my $self = shift;
260    unless ($self->column_type("host", "altip")) {
261        $self->dowell("ALTER TABLE host ADD COLUMN altip VARCHAR(15)");
262        $self->dowell("ALTER TABLE host ADD COLUMN altmask VARCHAR(18)");
263        $self->dowell("ALTER TABLE host ADD UNIQUE altip (altip)");
264    }
265}
266
267sub upgrade_add_device_asof {
268    my $self = shift;
269    unless ($self->column_type("device", "mb_asof")) {
270        $self->dowell("ALTER TABLE device ADD COLUMN mb_asof INT CHECK(mb_asof >= 0)");
271    }
272}
273
274sub upgrade_add_device_weight {
275    my $self = shift;
276    unless ($self->column_type("device", "weight")) {
277        $self->dowell("ALTER TABLE device ADD COLUMN weight INT DEFAULT 100");
278    }
279}
280
281sub upgrade_add_device_readonly {
282    my $self = shift;
283    unless ($self->column_constraint("device", "status") =~ /readonly/) {
284        $self->dowell("ALTER TABLE device MODIFY COLUMN status VARCHAR(8) CHECK(status IN ('alive', 'dead', 'down', 'readonly'))");
285    }
286}
287
288sub upgrade_add_device_drain {
289    my $self = shift;
290    unless ($self->column_constraint("device", "status") =~ /drain/) {
291        $self->dowell("ALTER TABLE device MODIFY COLUMN status VARCHAR(8) CHECK(status IN ('alive', 'dead', 'down', 'readonly','drain'))");
292    }
293}
294
295sub upgrade_add_host_readonly {
296    my $self = shift;
297    my $cn;
298    unless ($self->column_constraint("host", "status", \$cn) =~ /\breadonly\b/) {
299        $self->dbh->begin_work;
300        $self->dowell("ALTER TABLE host DROP CONSTRAINT $cn");
301        $self->dowell("ALTER TABLE host ADD CONSTRAINT status CHECK(".
302                      "status IN ('alive', 'dead', 'down', 'readonly'))");
303        $self->dbh->commit;
304    }
305}
306
307sub upgrade_modify_server_settings_value {
308    my $self = shift;
309    unless ($self->column_type("server_settings", "value" =~ /text/i)) {
310        $self->dowell("ALTER TABLE server_settings ALTER COLUMN value TYPE TEXT");
311    }
312}
313
314sub upgrade_add_file_to_queue_arg {
315    my $self = shift;
316    unless ($self->column_type("file_to_queue", "arg")) {
317        $self->dowell("ALTER TABLE file_to_queue ADD COLUMN arg TEXT");
318    }
319}
320
321# Postgres doesn't have or never used a MEDIUMINT for device.
322sub upgrade_modify_device_size {
323    return 1;
324}
325
326sub upgrade_add_class_hashtype {
327    my ($self) = @_;
328    unless ($self->column_type("class", "hashtype")) {
329        $self->dowell("ALTER TABLE class ADD COLUMN hashtype SMALLINT");
330    }
331}
332
333# return 1 on success.  die otherwise.
334sub enqueue_fids_to_delete {
335    # My kingdom for a real INSERT IGNORE implementation!
336    my ($self, @fidids) = @_;
337    my $sql = "INSERT INTO file_to_delete (fid) VALUES (?)";
338
339    foreach my $fidid (@fidids) {
340        $self->dbh->begin_work;
341        $self->condthrow;
342        eval {
343            $self->dbh->do($sql, undef, $fidid);
344        };
345        if ($@ || $self->dbh->err) {
346            if ($self->was_duplicate_error) {
347                # Do nothing
348            } else {
349                $self->condthrow;
350            }
351        }
352        $self->dbh->commit;
353    }
354
355}
356
357sub enqueue_fids_to_delete2 {
358    # My kingdom for a real REPLACE implementation!
359    my ($self, @fidids) = @_;
360    my $tbl = 'file_to_delete2';
361    my $sql1 = sprintf "INSERT INTO %s (fid, nexttry) VALUES (?,%s)", $tbl, $self->unix_timestamp;
362    my @dup_fids;
363
364    foreach my $fidid (@fidids) {
365        $self->dbh->begin_work;
366        $self->condthrow;
367        eval {
368            $self->dbh->do($sql1, undef, $fidid);
369        };
370        if ($@ || $self->dbh->err) {
371            if ($self->was_duplicate_error) {
372                push @dup_fids, $fidid;
373            } else {
374                $self->condthrow;
375            }
376        }
377        $self->dbh->commit;
378    }
379
380    my $sql2 = sprintf 'UPDATE %s SET nexttry = %s WHERE fid IN (?)', $tbl, $self->unix_timestamp;
381
382    foreach my $fidid (@dup_fids) {
383        $self->dbh->begin_work;
384        $self->condthrow;
385        eval {
386            $self->dbh->do($sql2, undef, $fidid);
387        };
388        if ($@ || $self->dbh->err) {
389            if ($self->was_duplicate_error) {
390                # Ignore, no need of it
391            } else {
392                $self->condthrow;
393            }
394        }
395        $self->dbh->commit;
396    }
397
398}
399
400# --------------------------------------------------------------------------
401# Functions specific to Store::Postgres subclass.  Not in parent.
402# --------------------------------------------------------------------------
403
404sub insert_or_ignore {
405    my $self = shift;
406    my %arg  = $self->_valid_params([qw(insert insert_vals)], @_);
407    return $self->insert_or_update(
408        insert => $arg{insert},
409        insert_vals => $arg{insert_vals},
410        update => 'IGNORE',
411        update_vals => 'IGNORE',
412    );
413}
414
415sub insert_or_update {
416    my $self = shift;
417    my %arg  = $self->_valid_params([qw(insert update insert_vals update_vals)], @_);
418    my $dbh = $self->dbh;
419    my $savepoint_name = $arg{insert};
420    $savepoint_name =~ s/^INSERT INTO ([^\s]+).*$/$1/g;
421
422    $dbh->begin_work;
423    $dbh->do('SAVEPOINT '.$savepoint_name);
424    eval {
425        $dbh->do($arg{insert}, undef, @{ $arg{insert_vals} });
426    };
427    if ($@ || $dbh->err) {
428        if ($self->was_duplicate_error) {
429            $dbh->do('ROLLBACK TO '.$savepoint_name);
430            if($arg{update} ne "IGNORE") {
431                $dbh->do($arg{update}, undef, @{ $arg{update_vals} });
432            }
433        }
434        $self->condthrow;
435    }
436
437    $dbh->commit;
438    return 1;
439}
440
441sub column_type {
442    my ($self, $table, $col) = @_;
443    my $sth = $self->dbh->prepare("SELECT column_name,data_type FROM information_schema.columns WHERE table_name=? AND column_name=?");
444    $sth->execute($table,$col);
445    while (my $rec = $sth->fetchrow_hashref) {
446        if ($rec->{column_name} eq $col) {
447            $sth->finish;
448            return $rec->{data_type};
449        }
450    }
451    return undef;
452}
453
454sub column_constraint {
455    my ($self, $table, $col, $cn) = @_;
456    my $sth = $self->dbh->prepare("SELECT column_name,information_schema.check_constraints.check_clause,constraint_name FROM information_schema.constraint_column_usage JOIN information_schema.check_constraints USING(constraint_catalog,constraint_schema,constraint_name) WHERE table_name=? AND column_name=?");
457    $sth->execute($table,$col);
458    while (my $rec = $sth->fetchrow_hashref) {
459        if ($rec->{column_name} eq $col) {
460            $sth->finish;
461            $$cn = $rec->{constraint_name} if $cn;
462            return $rec->{check_clause};
463        }
464    }
465    return undef;
466}
467
468sub fid_type {
469    my $self = shift;
470    return $self->{_fid_type} if $self->{_fid_type};
471
472    # let people force bigint mode with environment.
473    if ($ENV{MOG_FIDSIZE} && $ENV{MOG_FIDSIZE} eq "big") {
474        return $self->{_fid_type} = "BIGINT";
475    }
476
477    # else, check a maybe-existing table and see if we're in bigint
478    # mode already.
479    my $dbh = $self->dbh;
480    my $file_fid_type = $self->column_type("file", "fid");
481    if($file_fid_type) {
482        if ($file_fid_type =~ /bigint/i) {
483            return $self->{_fid_type} = "BIGINT";
484        } elsif($file_fid_type =~ /int/i) {
485            # Old installs might not have raised the fid type size yet.
486            return $self->{_fid_type} = "INT";
487        }
488    }
489
490    # Used to default to 32bit ints, but this always bites people
491    # a few years down the road. So default to 64bit.
492    return $self->{_fid_type} = "BIGINT";
493}
494
495# --------------------------------------------------------------------------
496# Test suite things we override
497# --------------------------------------------------------------------------
498
499sub new_temp {
500    my $self = shift;
501    my %args = @_;
502    my $dbname = $args{dbname} || "tmp_mogiletest";
503    my $host = $args{dbhost} || 'localhost';
504    my $port = $args{dbport} || 5432;
505    my $user = $args{dbuser} || 'mogile';
506    my $pass = $args{dbpass} || '';
507    my $rootuser = $args{dbrootuser} || $args{dbuser} || 'postgres';
508    my $rootpass = $args{dbrootpass} || $args{dbpass} || '';
509    _drop_db($dbname,$host,$port,$rootuser,$rootpass);
510
511    my @args = ( "$FindBin::Bin/../mogdbsetup", "--yes",
512            "--dbname=$dbname", "--type=Postgres",
513            "--dbhost=$host", "--dbport=$port",
514            "--dbuser=$user",
515            "--dbrootuser=$rootuser", );
516    push @args, "--dbpass=$pass" unless $pass eq '';
517    push @args, "--dbrootpass=$rootpass" unless $rootpass eq '';
518    system(@args)
519        and die "Failed to run mogdbsetup (".join(' ',map { "'".$_."'" } @args).").";
520
521    return MogileFS::Store->new_from_dsn_user_pass("dbi:Pg:dbname=$dbname;host=$host;port=$port",
522                                                   $user,
523                                                   $pass);
524}
525
526my $rootdbh;
527sub _root_dbh {
528    my $host     = shift;
529    my $port     = shift;
530    my $rootuser = shift;
531    my $rootpass = shift;
532    return $rootdbh ||= DBI->connect("DBI:Pg:dbname=postgres;host=$host;port=$port", $rootuser, $rootpass, { RaiseError => 1 })
533        or die "Couldn't connect to local PostgreSQL database as $rootuser";
534}
535
536sub _drop_db {
537    my $dbname = shift;
538    my $host     = shift;
539    my $port     = shift;
540    my $rootuser = shift;
541    my $rootpass = shift;
542    my $root_dbh = _root_dbh($host, $port, $rootuser, $rootpass);
543    eval {
544        $root_dbh->do("DROP DATABASE $dbname");
545    };
546}
547
548
549# --------------------------------------------------------------------------
550# Data-access things we override
551# --------------------------------------------------------------------------
552
553# returns 1 on success, 0 on duplicate key error, dies on exception
554# TODO: need a test to hit the duplicate name error condition
555sub rename_file {
556    my ($self, $fidid, $to_key) = @_;
557    my $dbh = $self->dbh;
558    eval {
559        $dbh->do('UPDATE file SET dkey = ? WHERE fid=?',
560                 undef, $to_key, $fidid);
561    };
562    if ($@ || $dbh->err) {
563        # first is error code for duplicates
564        if ($self->was_duplicate_error) {
565            return 0;
566        } else {
567            die $@;
568        }
569    }
570    $self->condthrow;
571    return 1;
572}
573
574# add a record of fidid existing on devid
575# returns 1 on success, 0 on duplicate
576sub add_fidid_to_devid {
577    my ($self, $fidid, $devid) = @_;
578    my $dbh = $self->dbh;
579    eval {
580        $dbh->do("INSERT INTO file_on (fid, devid) VALUES (?, ?)", undef, $fidid, $devid);
581    };
582
583    return 1 if !$@ && !$dbh->err;
584    return 0;
585}
586
587# update the device count for a given fidid
588sub update_devcount_atomic {
589    my ($self, $fidid) = @_;
590    my $rv;
591
592    $self->dbh->begin_work;
593    $rv = $self->dbh->do("SELECT devcount FROM file WHERE fid=? FOR UPDATE", undef, $fidid);
594    $self->condthrow;
595    if($rv == 0) {
596        $self->dbh->rollback;
597        return 1;
598    }
599    $rv = $self->dbh->do("UPDATE file SET devcount=(SELECT COUNT(devid) FROM file_on WHERE fid=?) WHERE fid=?", undef, $fidid, $fidid);
600    $self->condthrow;
601    $self->dbh->commit;
602    $self->condthrow;
603    return $rv;
604}
605
606# enqueue a fidid for replication, from a specific deviceid (can be undef), in a given number of seconds.
607sub enqueue_for_replication {
608    my ($self, $fidid, $from_devid, $in) = @_;
609    my $dbh = $self->dbh;
610
611    my $nexttry = 0;
612    if ($in) {
613        $nexttry = $self->unix_timestamp." + ${in}::int";
614    }
615
616    eval {
617        $dbh->do("INSERT INTO file_to_replicate (fid, fromdevid, nexttry) VALUES (?, ?, $nexttry)",
618                 undef, $fidid, $from_devid);
619    };
620}
621
622# reschedule all deferred replication, return number rescheduled
623sub replicate_now {
624    my ($self) = @_;
625    return $self->dbh->do("UPDATE file_to_replicate SET nexttry = ".$self->unix_timestamp." WHERE nexttry > ".$self->unix_timestamp);
626}
627
628sub reschedule_file_to_replicate_relative {
629    my ($self, $fid, $in_n_secs) = @_;
630    $self->dbh->do("UPDATE file_to_replicate SET nexttry = ".$self->unix_timestamp." + ?, failcount = failcount + 1 WHERE fid = ?",
631                   undef, $in_n_secs, $fid);
632}
633
634# creates a new domain, given a domain namespace string.  return the dmid on success,
635# throw 'dup' on duplicate name.
636sub create_domain {
637    my ($self, $name) = @_;
638    my $dbh = $self->dbh;
639
640    # get the max domain id
641    my $maxid = $dbh->selectrow_array('SELECT MAX(dmid) FROM domain') || 0;
642    my $rv = eval {
643        $dbh->do('INSERT INTO domain (dmid, namespace) VALUES (?, ?)',
644                 undef, $maxid + 1, $name);
645    };
646    if ($self->was_duplicate_error) {
647        throw("dup");
648    }
649    return $maxid+1 if $rv;
650    die "failed to make domain";  # FIXME: the above is racy.
651}
652
653sub set_server_setting {
654    my ($self, $key, $val) = @_;
655    my $dbh = $self->dbh;
656
657    if (defined $val) {
658        $self->insert_or_update(
659            insert => "INSERT INTO server_settings (field, value) VALUES (?, ?)",
660            insert_vals => [ $key, $val ],
661            update => "UPDATE server_settings SET value = ? WHERE field = ?",
662            update_vals => [ $val, $key ],
663        );
664    } else {
665        $dbh->do("DELETE FROM server_settings WHERE field=?", undef, $key);
666    }
667
668    die "Error updating 'server_settings': " . $dbh->errstr if $dbh->err;
669    return 1;
670}
671
672# This implementation is race-safe
673sub incr_server_setting {
674    my ($self, $key, $val) = @_;
675    $val = 1 unless defined $val;
676    return unless $val;
677
678    $self->dbh->begin_work;
679    my $value = $self->dbh->selectrow_array("SELECT value FROM server_settings WHERE field=? FOR UPDATE",undef,$key);
680    if($value) {
681        if($value =~ /^\d+$/) {
682            $value += $val;
683        } else {
684            warning("Wanted to incr_server_setting by $val on field=$key but old value was $value. Setting instead.");
685            $value = $val;
686        }
687        my $rv = $self->dbh->do("UPDATE server_settings ".
688            "SET value=? ".
689            "WHERE field=?", undef,
690            $value, $key) > 0;
691        $self->dbh->commit;
692        return 1 if $rv;
693    }
694    $self->dbh->rollback; # Release the row-lock
695    $self->set_server_setting($key, $val);
696}
697
698# return 1 on success, throw "dup" on duplicate devid or throws other error on failure
699sub create_device {
700    my ($self, $devid, $hostid, $status) = @_;
701    my $rv = $self->conddup(sub {
702        $self->dbh->do("INSERT INTO device (devid, hostid, status) VALUES (?, ?, ?)", undef,
703                       $devid, $hostid, $status);
704    });
705    $self->condthrow;
706    die "error making device $devid\n" unless $rv > 0;
707    return 1;
708}
709
710sub replace_into_file {
711    my $self = shift;
712    my %arg  = $self->_valid_params([qw(fidid dmid key length classid devcount)], @_);
713    $self->insert_or_update(
714        insert => "INSERT INTO file (fid, dmid, dkey, length, classid, devcount) VALUES (?, ?, ?, ?, ?, ?)",
715        insert_vals => [ @arg{'fidid', 'dmid', 'key', 'length', 'classid', 'devcount'} ],
716        update => "UPDATE file SET dmid=?, dkey=?, length=?, classid=?, devcount=? WHERE fid=?",
717        update_vals => [ @arg{'dmid', 'key', 'length', 'classid', 'devcount', 'fidid'} ],
718    );
719    $self->condthrow;
720}
721
722# given an array of MogileFS::DevFID objects, mass-insert them all
723# into file_on (ignoring if they're already present)
724sub mass_insert_file_on {
725    my ($self, @devfids) = @_;
726    my @qmarks = map { "(?,?)" } @devfids;
727    my @binds  = map { $_->fidid, $_->devid } @devfids;
728
729    my $sth = $self->dbh->prepare("INSERT INTO file_on (fid, devid) VALUES (?, ?)");
730    foreach (@devfids) {
731        eval {
732            $sth->execute($_->fidid, $_->devid);
733        };
734        $self->condthrow unless $self->was_duplicate_error;
735    }
736    return 1;
737}
738sub lockid {
739    my ($lockname) = @_;
740    croak("Called with empty lockname! $lockname") unless (defined $lockname && length($lockname) > 0);
741    my $num = unpack 'N',md5($lockname);
742    return ($num & 0x7fffffff);
743}
744
745# attempt to grab a lock of lockname, and timeout after timeout seconds.
746# the lock should be unique in the space of (lockid), as well the space of
747# (hostname,pid).
748# returns 1 on success and 0 on timeout
749sub get_lock {
750    my ($self, $lockname, $timeout) = @_;
751    my $hostid = lockid(hostname);
752    my $lockid = lockid($lockname);
753    die sprintf("Lock recursion detected (grabbing %s on %s (%s/%s), had %s (%s). Bailing out.", $lockname, hostname, $hostid, $lockid, $self->{last_lock}, lockid($self->{last_lock})) if $self->{lock_depth};
754
755    debug("$$ Locking $lockname ($lockid)\n") if $Mgd::DEBUG >= 5;
756
757    my $lock = undef;
758    while($timeout >= 0) {
759        $lock = $self->dbh->selectrow_array("SELECT pg_try_advisory_lock(?, ?)", undef, $hostid, $lockid);
760        $self->condthrow;
761        if (defined $lock) {
762            if($lock == 1) {
763                $self->{lock_depth} = 1;
764                $self->{last_lock}  = $lockname;
765                last;
766            } elsif($lock == 0) {
767                sleep 1 if $timeout > 0;
768                $timeout--;
769                next;
770            } else {
771                die "Something went horribly wrong while getting lock $lockname - unknown return value";
772            }
773        } else {
774            die "Something went horribly wrong while getting lock $lockname - undefined lock";
775        }
776    }
777    return $lock;
778}
779
780# attempt to release a lock of lockname.
781# returns 1 on success and 0 if no lock we have has that name.
782sub release_lock {
783    my ($self, $lockname) = @_;
784    my $hostid = lockid(hostname);
785    my $lockid = lockid($lockname);
786    debug("$$ Unlocking $lockname ($lockid)\n") if $Mgd::DEBUG >= 5;
787    my $rv = $self->dbh->selectrow_array("SELECT pg_advisory_unlock(?, ?)", undef, $hostid, $lockid);
788    debug("Double-release of lock $lockname!") if $self->{lock_depth} != 0 and $rv == 0 and $Mgd::DEBUG >= 2;
789    $self->condthrow;
790    $self->{lock_depth} = 0;
791    return $rv;
792}
793
794sub BLOB_BIND_TYPE { { pg_type => PG_BYTEA } }
795
796sub set_checksum {
797	my ($self, $fidid, $hashtype, $checksum) = @_;
798    my $dbh = $self->dbh;
799
800    $dbh->begin_work;
801    eval {
802        my $sth = $dbh->prepare("INSERT INTO checksum " .
803                                "(fid, hashtype, checksum) ".
804                                "VALUES (?, ?, ?)");
805        $sth->bind_param(1, $fidid);
806        $sth->bind_param(2, $hashtype);
807        $sth->bind_param(3, $checksum, BLOB_BIND_TYPE);
808        $sth->execute;
809    };
810    if ($@ || $dbh->err) {
811        if ($self->was_duplicate_error) {
812            eval {
813                my $sth = $dbh->prepare("UPDATE checksum " .
814                                        "SET hashtype = ?, checksum = ? " .
815                                        "WHERE fid = ?");
816                $sth->bind_param(1, $hashtype);
817                $sth->bind_param(2, $checksum, BLOB_BIND_TYPE);
818                $sth->bind_param(3, $fidid);
819                $sth->execute;
820            };
821            $self->condthrow;
822        }
823    }
824    $dbh->commit;
825    $self->condthrow;
826}
827
8281;
829
830__END__
831
832=head1 NAME
833
834MogileFS::Store::Postgres - PostgreSQL data storage for MogileFS
835
836=head1 SEE ALSO
837
838L<MogileFS::Store>
839
840
841