1package POE::Component::DBIAgent;
2
3# {{{ POD
4
5=head1 NAME
6
7POE::Component::DBIAgent - POE Component for running asynchronous DBI calls.
8
9=head1 SYNOPSIS
10
11 sub _start {
12    my ($self, $kernel, $heap) = @_[OBJECT, KERNEL, HEAP];
13
14    $heap->{helper} = POE::Component::DBIAgent->new( DSN => [$dsn,
15					       $username,
16					       $password
17					      ],
18				       Queries => $self->make_queries,
19				       Count => 3,
20				       Debug => 1,
21				     );
22
23	# Queries takes a hashref of the form:
24	# { query_name => 'select blah from table where x = ?',
25	#   other_query => 'select blah_blah from big_view',
26	#   etc.
27	# }
28
29    $heap->{helper}->query(query_name =>
30			   { cookie => 'starting_query' },
31			   session => 'get_row_from_dbiagent');
32
33 }
34
35 sub get_row_from_dbiagent {
36    my ($kernel, $self, $heap, $row, $cookie) = @_[KERNEL, OBJECT, HEAP, ARG0, ARG1];
37    if ($row ne 'EOF') {
38
39 # {{{ PROCESS A ROW
40
41	#row is a listref of columns
42
43 # }}} PROCESS A ROW
44
45    } else {
46
47 # {{{ NO MORE ROWS
48
49	#cleanup code here
50
51 # }}} NO MORE ROWS
52
53    }
54
55 }
56
57
58=head1 DESCRIPTION
59
60DBIAgent is your answer to non-blocking DBI in POE.
61
62It fires off a configurable number child processes (defaults to 3) and
63feeds database queries to it via two-way pipe (or sockets ... however
64POE::Component::Wheel::Run is able to manage it).  The primary method
65is C<query>.
66
67=head2 Usage
68
69After initializing a DBIAgent and storing it in a session's heap, one
70executes a C<query> (or C<query_slow>) with the query name,
71destination session (name or id) and destination state (as well as any
72query parameters, optionally) as arguments.  As each row of data comes
73back from the query, the destination state (in the destination
74session) is invoked with that row of data in its C<$_[ARG0]> slot.  When
75there are no more rows to return, the data in C<$_[ARG0]> is the string
76'EOF'.
77
78Not EVERY query should run through the DBIAgent.  If you need to run a
79short lookup from within a state, sometimes it can be a hassle to have
80to define a whole seperate state to receive its value, and resume
81processing from there..  The determining factor, of course, is how
82long your query will take to execute.  If you are trying to retrieve
83one row from a properly indexed table, use
84C<$dbh-E<gt>selectrow_array()>.  If there's a join involved, or
85multiple rows, or a view, you probably want to use DBIAgent.  If it's
86a longish query and startup costs (time) don't matter to you, go ahead
87and do it inline.. but remember the whole of your program suspends
88waiting for the result.  If startup costs DO matter, use DBIAgent.
89
90=head2 Return Values
91
92The destination state in the destination session (specified in the
93call to C<query()>) will receive the return values from the query in
94its C<$_[ARG0]> parameter.  DBIAgent invokes DBI's C<fetch> method
95internally, so the value will be a reference to an array.  If your
96query returns multiple rows, then your state will be invoked multiple
97times, once per row.  B<ADDITIONALLY>, your state will be called one
98time with C<$_[ARG0]> containing the string 'EOF'. 'EOF' is returned I<even
99if the query doesn't return any other rows>.  This is also what to
100expect for DML (INSERT, UPDATE, DELETE) queries.  A way to utilise
101this might be as follows:
102
103 sub some_state {
104     #...
105     if ($enough_values_to_begin_updating) {
106
107	 $heap->{dbiagent}->query(update_values_query =>
108				  this_session =>
109				  update_next_value =>
110				  shift @{$heap->{values_to_be_updated}}
111				 );
112     }
113 }
114
115 sub update_next_value {
116     my ($self, $heap) = @_[OBJECT, HEAP];
117     # we got 'EOF' in ARG0 here but we don't care... we know that an
118     # update has been executed.
119
120     for (1..3) {		# Do three at a time!
121	 my $value;
122	 last unless defined ($value = shift @{$heap->{values_to_be_updated}});
123	 $heap->{dbiagent}->query(update_values =>
124				  this_session =>
125				  update_next_value =>
126				  $value
127				 );
128     }
129
130 }
131
132=cut
133
134# }}} POD
135
136#use Data::Dumper;
137use Storable qw/freeze thaw/;
138use Carp;
139
140use strict;
141use POE qw/Session Filter::Reference Wheel::Run Component::DBIAgent::Helper Component::DBIAgent::Queue/;
142
143use vars qw/$VERSION/;
144
145$VERSION = sprintf("%d.%02d", q$Revision: 0.26 $ =~ /(\d+)\.(\d+)/);
146
147use constant DEFAULT_KIDS => 3;
148
149sub debug { $_[0]->{debug} }
150#sub debug { 1 }
151#sub debug { 0 }
152
153#sub carp { warn @_ }
154#sub croak { die @_ }
155
156# {{{ new
157
158=head2 new()
159
160Creating an instance creates a POE::Session to manage communication
161with the Helper processes.  Queue management is transparent and
162automatic.  The constructor is named C<new()> (surprised, eh?  Yeah,
163me too).  The parameters are as follows:
164
165=over
166
167=item DSN
168
169An arrayref of parameters to pass to DBI->connect (usually a dsn,
170username, and password).
171
172=item Queries
173
174A hashref of the form Query_Name => "$SQL".  For example:
175
176 {
177   sysdate => "select sysdate from dual",
178   employee_record => "select * from emp where id = ?",
179   increase_inventory => "update inventory
180                          set count = count + ?
181                          where item_id = ?",
182 }
183
184As the example indicates, DBI placeholders are supported, as are DML
185statements.
186
187=item Count
188
189The number of helper processes to spawn.  Defaults to 3.  The optimal
190value for this parameter will depend on several factors, such as: how
191many different queries your program will be running, how much RAM you
192have, how often you run queries, and most importantly, how many
193queries you intend to run I<simultaneously>.
194
195=item ErrorState
196
197An listref containing a session and event name to receive error
198messages from the DBI.  The message arrives in ARG0.
199
200=back
201
202=cut
203
204sub new {
205    my $type = shift;
206
207    croak "$type needs an even number of parameters" if @_ & 1;
208    my %params = @_;
209
210    my $dsn = delete $params{DSN};
211    croak "$type needs a DSN parameter" unless defined $dsn;
212    croak "DSN needs to be an array reference" unless ref $dsn eq 'ARRAY';
213
214    my $queries = delete $params{Queries};
215    croak "$type needs a Queries parameter" unless defined $queries;
216    croak "Queries needs to be a hash reference" unless ref $queries eq 'HASH';
217
218    my $count = delete $params{Count} || DEFAULT_KIDS;
219    #croak "$type needs a Count parameter" unless defined $queries;
220
221    # croak "Queries needs to be a hash reference" unless ref $queries eq 'HASH';
222
223    my $debug = delete $params{Debug} || 0;
224    # $count = 1 if $debug;
225
226    my $errorstate = delete $params{ErrorState} || undef;
227
228    # Make sure the user didn't pass in parameters we're not aware of.
229    if (scalar keys %params) {
230	carp( "unknown parameters in $type constructor call: ",
231	      join(', ', sort keys %params)
232	    );
233    }
234    my $self = bless {}, $type;
235    my $config = shift;
236
237    $self->{dsn} = $dsn;
238    $self->{queries} = $queries;
239    $self->{count} = $count;
240    $self->{debug} = $debug;
241    $self->{errorstate} = $errorstate;
242    $self->{finish} = 0;
243    $self->{pending_query_count} = 0;
244    $self->{active_query_count} = 0;
245    $self->{cookies} = [];
246    $self->{group_cache} = [];
247
248#     POE::Session->new( $self,
249# 		       [ qw [ _start _stop db_reply remote_stderr error ] ]
250# 		     );
251
252    POE::Session->create( object_states =>
253                          [ $self => [ qw [ _start _stop db_reply remote_stderr error ] ] ]
254                        );
255
256    return $self;
257
258}
259
260# }}} new
261
262# {{{ query
263
264# {{{ POD
265
266=head2 query(I<$query_name>, [ \%args, ] I<$session>, I<$state>, [ I<@parameters> ])
267
268The C<query()> method takes at least three parameters, plus any bind
269values for the specific query you are executing.
270
271=over
272
273=item $query_name
274
275This parameter must be one of the keys to the Queries hashref you
276passed to the constructor.  It is used to indicate which query you
277wish to execute.
278
279=item \%args
280
281This is an OPTIONAL hashref of arguments to pass to the query.
282
283Currently supported arguments:
284
285=over 4
286
287=item hash
288
289Return rows hash references instead of array references.
290
291=item cookie
292
293A cookie to pass to this query.  This is passed back unchanged to the
294destination state in C<$_[ARG1]>.  Can be any scalar (including
295references, and even POE postbacks, so be careful!).  You can use this
296as an identifier if you have one destination state handling multiple
297different queries or sessions.
298
299=item delay
300
301Insert a 1ms delay between each row of output.
302
303I know what you're thinking: "WHY would you want to slow down query
304responses?!?!?"  It has to do with CONCURRENCY.  When a response
305(finally) comes in from the agent after running the query, it floods
306the input channel with response data.  This has the effect of
307monopolizing POE's attention, so that any other handles (network
308sockets, pipes, file descriptors) keep getting pushed further back on
309the queue, and to all other processes EXCEPT the agent, your POE
310program looks hung for the amount of time it takes to process all of
311the incoming query data.
312
313So, we insert 1ms of time via Time::HiRes's C<usleep> function.  In
314human terms, this is essentially negligible.  But it is just enough
315time to allow competing handles (sockets, files) to trigger
316C<select()>, and get handled by the POE::Kernel, in situations where
317concurrency has priority over transfer rate.
318
319Naturally, the Time::HiRes module is required for this functionality.
320If Time::HiRes is not installed, the delay is ignored.
321
322=item group
323
324Sends the return event back when C<group> rows are retrieved from the
325database, to avoid event spam when selecting lots of rows. NB: using
326group means that C<$row> will be an arrayref of rows, not just a single
327row.
328
329=back
330
331=item $session, $state
332
333These parameters indicate the POE state that is to receive the data
334returned from the database.  The state indicated will receive the data
335in its C<$_[ARG0]> parameter.  I<PLEASE> make sure this is a valid
336state, otherwise you will spend a LOT of time banging your head
337against the wall wondering where your query data is.
338
339=item @parameters
340
341These are any parameters your query requires.  B<WARNING:> You must
342supply exactly as many parameters as your query has placeholders!
343This means that if your query has NO placeholders, then you should
344pass NO extra parameters to C<query>.
345
346Suggestions to improve this syntax are welcome.
347
348=back
349
350=cut
351
352# }}} POD
353
354sub query {
355    my ($self, $query, $package, $state, @rest) = @_;
356    my $options = {};
357
358    if (ref $package) {
359	unless (ref $package eq 'HASH') {
360	    carp "Options has must be a HASH reference";
361	}
362	$options = $package;
363
364	# this shifts the first element off of @rest and puts it into
365	# $state
366	($package, $state) = ($state, shift @rest);
367    }
368
369    # warn "QD: Running $query";
370
371    my $agent = $self->{helper}->next;
372    my $input = { query => $query,
373		  package => $package, state => $state,
374		  params => \@rest,
375		  delay => 0,
376		  id => "_",
377		  %$options,
378		};
379
380    $self->{pending_query_count}++;
381    if ($self->{active_query_count} < $self->{count} ) {
382
383	$input->{id} = $agent->ID;
384	$self->{cookies}[$input->{id}] = delete $input->{cookie};
385	$agent->put( $input );
386	$self->{active_query_count}++;
387	$self->{group_cache}[$input->{id}] = [];
388
389    } else {
390	push @{$self->{pending_queries}}, $input;
391    }
392
393    $self->debug
394      && warn sprintf("QA:(#%s) %d pending: %s => %s, return %d rows at once\n",
395		      $input->{id}, $self->{pending_query_count},
396		      $input->{query},
397		      "$input->{package}::$input->{state}",
398		      $input->{group} || 1,
399		     );
400
401}
402
403# }}} query
404
405#========================================================================================
406# {{{ shutdown
407
408=head2 finish()
409
410The C<finish()> method tells DBIAgent that the program is finished
411sending queries.  DBIAgent will shut its helpers down gracefully after
412they complete any pending queries.  If there are no pending queries,
413the DBIAgent will shut down immediately.
414
415=cut
416
417sub finish {
418    my $self = shift;
419
420    $self->{finish} = 1;
421
422    unless ($self->{pending_query_count}) {
423      $self->debug and carp "QA: finish() called without pending queries. Shutting down now.";
424      $self->{helper}->exit_all();
425    }
426    else {
427      $self->debug && carp "QA: Setting finish flag for later.\n";
428    }
429}
430
431# }}} shutdown
432
433#========================================================================================
434
435# {{{ STATES
436
437# {{{ _start
438
439sub _start {
440    my ($self, $kernel, $heap, $dsn, $queries) = @_[OBJECT, KERNEL, HEAP, ARG0, ARG1];
441
442    $self->debug && warn __PACKAGE__ . " received _start.\n";
443
444    # make this session accessible to the others.
445    #$kernel->alias_set( 'qa' );
446
447    my $queue = POE::Component::DBIAgent::Queue->new();
448    $self->{filter} = POE::Filter::Reference->new();
449
450    ## Input and output from the children will be line oriented
451    foreach (1..$self->{count}) {
452	my $helper = POE::Wheel::Run->new(
453					  Program     => sub {
454					      POE::Component::DBIAgent::Helper->run($self->{dsn}, $self->{queries});
455					  },
456					  StdoutEvent => 'db_reply',
457					  StderrEvent => 'remote_stderr',
458					  ErrorEvent  => 'error',
459					  #StdinFilter => POE::Filter::Line->new(),
460					  StdinFilter => POE::Filter::Reference->new(),
461					  StdoutFilter => POE::Filter::Reference->new(),
462					 )
463	  or warn "Can't create new Wheel::Run: $!\n";
464	$self->debug && warn __PACKAGE__, " Started db helper pid ", $helper->PID, " wheel ", $helper->ID, "\n";
465	$queue->add($helper);
466    }
467
468    $self->{helper} = $queue;
469
470}
471
472# }}} _start
473# {{{ _stop
474
475sub _stop {
476    my ($self, $heap) = @_[OBJECT, HEAP];
477
478    $self->{helper}->kill_all();
479
480    # Oracle clients don't like to TERMinate sometimes.
481    $self->{helper}->kill_all(9);
482    $self->debug && warn __PACKAGE__ . " has stopped.\n";
483
484}
485
486# }}} _stop
487
488# {{{ db_reply
489
490sub db_reply {
491    my ($kernel, $self, $heap, $input) = @_[KERNEL, OBJECT, HEAP, ARG0];
492
493    # Parse the "receiving state" and dispatch the input line to that state.
494
495    # not needed for Filter::Reference
496    my ($package, $state, $data, $cookie, $group);
497    $package = $input->{package};
498    $state = $input->{state};
499    $data = $input->{data};
500    $group = $input->{group} || 0;
501    # change so cookies are no longer sent over the reference channel
502    $cookie = $self->{cookies}[$input->{id}];
503
504    unless (ref $data or $data eq 'EOF') {
505	warn "QA: Got $data\n";
506    }
507    # $self->debug && $self->debug && warn "QA: received db_reply for $package => $state\n";
508
509    unless (defined $data) {
510	$self->debug && warn "QA: Empty input value.\n";
511	return;
512    }
513
514    if ($data eq 'EOF') {
515	# $self->debug && warn "QA: ${package}::${state} (#$input->{id}): EOF\n";
516        $self->{pending_query_count}--;
517	$self->{active_query_count}--;
518
519	$self->debug
520	  && warn sprintf("QA:(#%s) %d pending: EOF => %s\n",
521			  $input->{id}, $self->{pending_query_count},
522			 "$input->{package}::$input->{state}");
523
524        # If this was the last query to go, and we've been requested
525        # to finish, then turn out the lights.
526        unless ($self->{pending_query_count}) {
527          if ($self->{finish}) {
528            $self->debug and warn "QA: Last query done, and finish flag set.  Shutting down.\n";
529            $self->{helper}->exit_all();
530          }
531        }
532        elsif ($self->debug and $self->{pending_query_count} < 0) {
533          die "QA: Pending query count went negative (should never do that)";
534        }
535
536	# place this agent at the front of the queue, for next query
537	$self->{helper}->make_next($input->{id});
538
539	if ( $self->{pending_queries} and
540	     @{$self->{pending_queries}} and
541	     $self->{active_query_count} < $self->{count}
542	   ) {
543
544	    my $input = shift @{$self->{pending_queries}};
545	    my $agent = $self->{helper}->next;
546
547	    $input->{id} = $agent->ID;
548	    $self->{cookies}[$input->{id}] = delete $input->{cookie};
549	    $agent->put( $input );
550	    $self->{active_query_count}++;
551
552	    $self->debug &&
553	      warn sprintf("QA:(#%s) %d pending: %s => %s\n",
554			 $input->{id}, $self->{pending_query_count},
555			   $input->{query},
556			   "$input->{package}::$input->{state}"
557			  );
558
559	}
560    }
561    if ($group) {
562        push @{ $self->{group_cache}[$input->{id}] }, $data;
563	if (scalar @{ $self->{group_cache}[$input->{id}] } == $group || $data eq 'EOF') {
564	    $kernel->post($package => $state => $self->{group_cache}[$input->{id}], $cookie);
565	    $self->{group_cache}[$input->{id}] = [];
566	}
567    } else {
568        $kernel->post($package => $state => $data => $cookie);
569    }
570
571
572}
573
574# }}} db_reply
575
576# {{{ remote_stderr
577
578sub remote_stderr {
579    my ($self, $kernel, $operation, $errnum, $errstr, $wheel_id, $data) = @_[OBJECT, KERNEL, ARG0..ARG4];
580
581    $self->debug && warn defined $errstr ? "$operation: $errstr\n" : "$operation\n";
582
583    $kernel->post(@{$self->{errorstate}}, $operation, $errstr, $wheel_id) if defined $self->{errorstate};
584}
585
586# }}} remote_stderr
587# {{{ error
588
589sub error {
590    my ($self, $operation, $errnum, $errstr, $wheel_id) = @_[OBJECT, ARG0..ARG3];
591
592    $errstr = "child process closed connection" unless $errnum;
593    $self->debug and warn "error: Wheel $wheel_id generated $operation error $errnum: $errstr\n";
594
595    $self->{helper}->remove_by_wheelid($wheel_id);
596}
597
598# }}} error
599
600# }}} STATES
601
6021;
603
604__END__
605
606=head1 NOTES
607
608=over
609
610=item *
611
612Error handling is practically non-existent.
613
614=item *
615
616The calling syntax is still pretty weak... but improving.  We may
617eventually add an optional attributes hash so that each query can be
618called with its own individual characteristics.
619
620=item *
621
622I might eventually want to support returning hashrefs, if there is any
623demand.
624
625=item *
626
627Every query is prepared at Helper startup.  This could potentially be
628pretty expensive.  Perhaps a cached or deferred loading might be
629better?  This is considering that not every helper is going to run
630every query, especially if you have a lot of miscellaneous queries.
631
632=back
633
634Suggestions welcome!  Diffs I<more> welcome! :-)
635
636=head1 AUTHOR
637
638This module has been fine-tuned and packaged by Rob Bloodgood
639E<lt>robb@empire2.comE<gt>.  However, most of the queuing code
640originated with Fletch E<lt>fletch@phydeaux.orgE<gt>, either directly
641or via his ideas.  Thank you for making this module a reality, Fletch!
642
643However, I own all of the bugs.
644
645This module is free software; you may redistribute it and/or modify it
646under the same terms as Perl itself.
647
648=cut
649