1package Data::ObjectDriver::Driver::GearmanDBI; 2use strict; 3use warnings; 4 5use base qw( Data::ObjectDriver ); 6use Data::ObjectDriver::Iterator; 7use Storable(); 8use Digest::MD5; 9use Data::Dumper; 10 11__PACKAGE__->mk_accessors(qw( 12 dbi client func driver_arg enabled_cb uniqify_cb 13 on_exception_cb retry_count timeout 14)); 15 16sub init { 17 my $driver = shift; 18 my %param = @_; 19 20 for my $key (keys %param) { 21 $driver->$key($param{$key}); 22 } 23} 24 25sub search { 26 my $driver = shift; 27 my($class, $terms, $args) = @_; 28 29 if ($Data::ObjectDriver::RESTRICT_IO) { 30 die "Attempted GearmanDBI I/O while in restricted mode: search() " . Dumper($terms, $args); 31 } 32 33 my $dbi = $driver->dbi; 34 35 ## if Gearman shouldn't be used, fallback to the configured dbi driver 36 return $dbi->search(@_) 37 unless $driver->enabled_cb->(@_); 38 39 my ($sql, $bind, $stmt) = $dbi->prepare_fetch($class, $terms, $args); 40 my $results = $driver->_gearman_search($sql, $bind); 41 42 ## Transform the array returned by gearman to the hash we expect to load 43 ## in the object 44 my $map = $stmt->select_map; 45 my @select = @{ $stmt->select }; 46 47 my $to_hash = sub { 48 my $array = shift; 49 my $hash; 50 my $i = 0; 51 for my $col (@select) { 52 $hash->{ $map->{$col} } = $array->[$i++]; 53 } 54 return $hash; 55 }; 56 57 my $nt = $args->{no_triggers}; 58 my @objs = map { $dbi->load_object_from_rec($class, $_, $nt); } 59 map { $to_hash->($_) } 60 @$results; 61 62 return wantarray 63 ? @objs 64 : Data::ObjectDriver::Iterator->new( sub { shift @objs } ); 65} 66 67sub _gearman_search { 68 my $driver = shift; 69 my ($sql, $bind) = @_; 70 71 my $uniqify = $driver->uniqify_cb || \&_md5sum; 72 my $func = $driver->func; 73 my $uniq = $uniqify->($sql, $bind); 74 my $client = $driver->client; 75 76 my %options = (); 77 $options{on_exception} = $driver->on_exception_cb 78 if $driver->on_exception_cb; 79 $options{retry_count} = $driver->retry_count 80 if $driver->retry_count; 81 $options{timeout} = $driver->timeout 82 if $driver->timeout; 83 84 my $res = $client->do_task( $func => 85 \Storable::nfreeze( { 86 driver_arg => $driver->driver_arg, 87 sql => $sql, 88 bind => $bind, 89 key => $uniq, 90 } ), 91 { 92 uniq => $uniq, # coalesce all requests for this data 93 %options, 94 } 95 ); 96 return $res ? Storable::thaw($$res) : []; 97} 98 99sub _md5sum { 100 my ($sql, $bind) = @_; 101 return Digest::MD5::md5_hex(join "", $sql, @$bind); 102} 103 104## every single data access methods are delegated to dbi 105## except for search 106sub lookup { shift->dbi->lookup (@_) } 107sub lookup_multi { shift->dbi->lookup_multi (@_) } 108sub exists { shift->dbi->exists (@_) } 109sub insert { shift->dbi->insert (@_) } 110sub replace { shift->dbi->replace (@_) } 111sub update { shift->dbi->update (@_) } 112sub remove { shift->dbi->remove (@_) } 113sub fetch_data { shift->dbi->fetch_data (@_) } 114 115## transactions are passed to dbi 116sub add_working_driver { shift->dbi->add_working_driver (@_) } 117sub commit { shift->dbi->commit (@_) } 118sub rollback { shift->dbi->rollback (@_) } 119sub rw_handle { shift->dbi->rw_handle (@_) } 120sub r_handle { shift->dbi->r_handle (@_) } 121 122## safety AUTOLOAD for the rest of non-core methods 123sub DESTROY { } 124sub AUTOLOAD { 125 my $driver = shift; 126 (my $meth = our $AUTOLOAD) =~ s/^.*:://; 127 return $driver->dbi->$meth(@_); 128} 129 1301; 131