1package Data::ObjectDriver::Driver::GearmanDBI;
2use strict;
3use warnings;
4
5use base qw( Data::ObjectDriver );
6use Data::ObjectDriver::Iterator;
7use Storable();
8use Digest::MD5;
9use Data::Dumper;
10
11__PACKAGE__->mk_accessors(qw(
12    dbi client func driver_arg enabled_cb uniqify_cb
13    on_exception_cb retry_count timeout
14));
15
16sub init {
17    my $driver = shift;
18    my %param  = @_;
19
20    for my $key (keys %param) {
21        $driver->$key($param{$key});
22    }
23}
24
25sub search {
26    my $driver = shift;
27    my($class, $terms, $args) = @_;
28
29    if ($Data::ObjectDriver::RESTRICT_IO) {
30        die "Attempted GearmanDBI I/O while in restricted mode: search() " . Dumper($terms, $args);
31    }
32
33    my $dbi = $driver->dbi;
34
35    ## if Gearman shouldn't be used, fallback to the configured dbi driver
36    return $dbi->search(@_)
37        unless $driver->enabled_cb->(@_);
38
39    my ($sql, $bind, $stmt) = $dbi->prepare_fetch($class, $terms, $args);
40    my $results = $driver->_gearman_search($sql, $bind);
41
42    ## Transform the array returned by gearman to the hash we expect to load
43    ## in the object
44    my $map    = $stmt->select_map;
45    my @select = @{ $stmt->select };
46
47    my $to_hash = sub {
48        my $array = shift;
49        my $hash;
50        my $i = 0;
51        for my $col (@select) {
52            $hash->{ $map->{$col} } = $array->[$i++];
53        }
54        return $hash;
55    };
56
57    my $nt = $args->{no_triggers};
58    my @objs = map { $dbi->load_object_from_rec($class, $_, $nt); }
59               map { $to_hash->($_) }
60               @$results;
61
62    return wantarray
63            ? @objs
64            : Data::ObjectDriver::Iterator->new( sub { shift @objs } );
65}
66
67sub _gearman_search {
68    my $driver = shift;
69    my ($sql, $bind) = @_;
70
71    my $uniqify = $driver->uniqify_cb || \&_md5sum;
72    my $func    = $driver->func;
73    my $uniq    = $uniqify->($sql, $bind);
74    my $client  = $driver->client;
75
76    my %options = ();
77    $options{on_exception} = $driver->on_exception_cb
78        if $driver->on_exception_cb;
79    $options{retry_count}  = $driver->retry_count
80        if $driver->retry_count;
81    $options{timeout}      = $driver->timeout
82        if $driver->timeout;
83
84    my $res = $client->do_task( $func =>
85        \Storable::nfreeze( {
86            driver_arg => $driver->driver_arg,
87            sql        => $sql,
88            bind       => $bind,
89            key        => $uniq,
90        } ),
91        {
92            uniq => $uniq, # coalesce all requests for this data
93            %options,
94        }
95    );
96    return $res ? Storable::thaw($$res) : [];
97}
98
99sub _md5sum {
100    my ($sql, $bind) = @_;
101    return Digest::MD5::md5_hex(join "", $sql, @$bind);
102}
103
104## every single data access methods are delegated to dbi
105## except for search
106sub lookup       { shift->dbi->lookup       (@_) }
107sub lookup_multi { shift->dbi->lookup_multi (@_) }
108sub exists       { shift->dbi->exists       (@_) }
109sub insert       { shift->dbi->insert       (@_) }
110sub replace      { shift->dbi->replace      (@_) }
111sub update       { shift->dbi->update       (@_) }
112sub remove       { shift->dbi->remove       (@_) }
113sub fetch_data   { shift->dbi->fetch_data   (@_) }
114
115## transactions are passed to dbi
116sub add_working_driver { shift->dbi->add_working_driver (@_) }
117sub commit             { shift->dbi->commit             (@_) }
118sub rollback           { shift->dbi->rollback           (@_) }
119sub rw_handle          { shift->dbi->rw_handle          (@_) }
120sub r_handle           { shift->dbi->r_handle           (@_) }
121
122## safety AUTOLOAD for the rest of non-core methods
123sub DESTROY { }
124sub AUTOLOAD {
125    my $driver = shift;
126    (my $meth = our $AUTOLOAD) =~ s/^.*:://;
127    return $driver->dbi->$meth(@_);
128}
129
1301;
131