1package POE::Component::DBIAgent; 2 3# {{{ POD 4 5=head1 NAME 6 7POE::Component::DBIAgent - POE Component for running asynchronous DBI calls. 8 9=head1 SYNOPSIS 10 11 sub _start { 12 my ($self, $kernel, $heap) = @_[OBJECT, KERNEL, HEAP]; 13 14 $heap->{helper} = POE::Component::DBIAgent->new( DSN => [$dsn, 15 $username, 16 $password 17 ], 18 Queries => $self->make_queries, 19 Count => 3, 20 Debug => 1, 21 ); 22 23 # Queries takes a hashref of the form: 24 # { query_name => 'select blah from table where x = ?', 25 # other_query => 'select blah_blah from big_view', 26 # etc. 27 # } 28 29 $heap->{helper}->query(query_name => 30 { cookie => 'starting_query' }, 31 session => 'get_row_from_dbiagent'); 32 33 } 34 35 sub get_row_from_dbiagent { 36 my ($kernel, $self, $heap, $row, $cookie) = @_[KERNEL, OBJECT, HEAP, ARG0, ARG1]; 37 if ($row ne 'EOF') { 38 39 # {{{ PROCESS A ROW 40 41 #row is a listref of columns 42 43 # }}} PROCESS A ROW 44 45 } else { 46 47 # {{{ NO MORE ROWS 48 49 #cleanup code here 50 51 # }}} NO MORE ROWS 52 53 } 54 55 } 56 57 58=head1 DESCRIPTION 59 60DBIAgent is your answer to non-blocking DBI in POE. 61 62It fires off a configurable number child processes (defaults to 3) and 63feeds database queries to it via two-way pipe (or sockets ... however 64POE::Component::Wheel::Run is able to manage it). The primary method 65is C<query>. 66 67=head2 Usage 68 69After initializing a DBIAgent and storing it in a session's heap, one 70executes a C<query> (or C<query_slow>) with the query name, 71destination session (name or id) and destination state (as well as any 72query parameters, optionally) as arguments. As each row of data comes 73back from the query, the destination state (in the destination 74session) is invoked with that row of data in its C<$_[ARG0]> slot. When 75there are no more rows to return, the data in C<$_[ARG0]> is the string 76'EOF'. 77 78Not EVERY query should run through the DBIAgent. If you need to run a 79short lookup from within a state, sometimes it can be a hassle to have 80to define a whole seperate state to receive its value, and resume 81processing from there.. The determining factor, of course, is how 82long your query will take to execute. If you are trying to retrieve 83one row from a properly indexed table, use 84C<$dbh-E<gt>selectrow_array()>. If there's a join involved, or 85multiple rows, or a view, you probably want to use DBIAgent. If it's 86a longish query and startup costs (time) don't matter to you, go ahead 87and do it inline.. but remember the whole of your program suspends 88waiting for the result. If startup costs DO matter, use DBIAgent. 89 90=head2 Return Values 91 92The destination state in the destination session (specified in the 93call to C<query()>) will receive the return values from the query in 94its C<$_[ARG0]> parameter. DBIAgent invokes DBI's C<fetch> method 95internally, so the value will be a reference to an array. If your 96query returns multiple rows, then your state will be invoked multiple 97times, once per row. B<ADDITIONALLY>, your state will be called one 98time with C<$_[ARG0]> containing the string 'EOF'. 'EOF' is returned I<even 99if the query doesn't return any other rows>. This is also what to 100expect for DML (INSERT, UPDATE, DELETE) queries. A way to utilise 101this might be as follows: 102 103 sub some_state { 104 #... 105 if ($enough_values_to_begin_updating) { 106 107 $heap->{dbiagent}->query(update_values_query => 108 this_session => 109 update_next_value => 110 shift @{$heap->{values_to_be_updated}} 111 ); 112 } 113 } 114 115 sub update_next_value { 116 my ($self, $heap) = @_[OBJECT, HEAP]; 117 # we got 'EOF' in ARG0 here but we don't care... we know that an 118 # update has been executed. 119 120 for (1..3) { # Do three at a time! 121 my $value; 122 last unless defined ($value = shift @{$heap->{values_to_be_updated}}); 123 $heap->{dbiagent}->query(update_values => 124 this_session => 125 update_next_value => 126 $value 127 ); 128 } 129 130 } 131 132=cut 133 134# }}} POD 135 136#use Data::Dumper; 137use Storable qw/freeze thaw/; 138use Carp; 139 140use strict; 141use POE qw/Session Filter::Reference Wheel::Run Component::DBIAgent::Helper Component::DBIAgent::Queue/; 142 143use vars qw/$VERSION/; 144 145$VERSION = sprintf("%d.%02d", q$Revision: 0.26 $ =~ /(\d+)\.(\d+)/); 146 147use constant DEFAULT_KIDS => 3; 148 149sub debug { $_[0]->{debug} } 150#sub debug { 1 } 151#sub debug { 0 } 152 153#sub carp { warn @_ } 154#sub croak { die @_ } 155 156# {{{ new 157 158=head2 new() 159 160Creating an instance creates a POE::Session to manage communication 161with the Helper processes. Queue management is transparent and 162automatic. The constructor is named C<new()> (surprised, eh? Yeah, 163me too). The parameters are as follows: 164 165=over 166 167=item DSN 168 169An arrayref of parameters to pass to DBI->connect (usually a dsn, 170username, and password). 171 172=item Queries 173 174A hashref of the form Query_Name => "$SQL". For example: 175 176 { 177 sysdate => "select sysdate from dual", 178 employee_record => "select * from emp where id = ?", 179 increase_inventory => "update inventory 180 set count = count + ? 181 where item_id = ?", 182 } 183 184As the example indicates, DBI placeholders are supported, as are DML 185statements. 186 187=item Count 188 189The number of helper processes to spawn. Defaults to 3. The optimal 190value for this parameter will depend on several factors, such as: how 191many different queries your program will be running, how much RAM you 192have, how often you run queries, and most importantly, how many 193queries you intend to run I<simultaneously>. 194 195=item ErrorState 196 197An listref containing a session and event name to receive error 198messages from the DBI. The message arrives in ARG0. 199 200=back 201 202=cut 203 204sub new { 205 my $type = shift; 206 207 croak "$type needs an even number of parameters" if @_ & 1; 208 my %params = @_; 209 210 my $dsn = delete $params{DSN}; 211 croak "$type needs a DSN parameter" unless defined $dsn; 212 croak "DSN needs to be an array reference" unless ref $dsn eq 'ARRAY'; 213 214 my $queries = delete $params{Queries}; 215 croak "$type needs a Queries parameter" unless defined $queries; 216 croak "Queries needs to be a hash reference" unless ref $queries eq 'HASH'; 217 218 my $count = delete $params{Count} || DEFAULT_KIDS; 219 #croak "$type needs a Count parameter" unless defined $queries; 220 221 # croak "Queries needs to be a hash reference" unless ref $queries eq 'HASH'; 222 223 my $debug = delete $params{Debug} || 0; 224 # $count = 1 if $debug; 225 226 my $errorstate = delete $params{ErrorState} || undef; 227 228 # Make sure the user didn't pass in parameters we're not aware of. 229 if (scalar keys %params) { 230 carp( "unknown parameters in $type constructor call: ", 231 join(', ', sort keys %params) 232 ); 233 } 234 my $self = bless {}, $type; 235 my $config = shift; 236 237 $self->{dsn} = $dsn; 238 $self->{queries} = $queries; 239 $self->{count} = $count; 240 $self->{debug} = $debug; 241 $self->{errorstate} = $errorstate; 242 $self->{finish} = 0; 243 $self->{pending_query_count} = 0; 244 $self->{active_query_count} = 0; 245 $self->{cookies} = []; 246 $self->{group_cache} = []; 247 248# POE::Session->new( $self, 249# [ qw [ _start _stop db_reply remote_stderr error ] ] 250# ); 251 252 POE::Session->create( object_states => 253 [ $self => [ qw [ _start _stop db_reply remote_stderr error ] ] ] 254 ); 255 256 return $self; 257 258} 259 260# }}} new 261 262# {{{ query 263 264# {{{ POD 265 266=head2 query(I<$query_name>, [ \%args, ] I<$session>, I<$state>, [ I<@parameters> ]) 267 268The C<query()> method takes at least three parameters, plus any bind 269values for the specific query you are executing. 270 271=over 272 273=item $query_name 274 275This parameter must be one of the keys to the Queries hashref you 276passed to the constructor. It is used to indicate which query you 277wish to execute. 278 279=item \%args 280 281This is an OPTIONAL hashref of arguments to pass to the query. 282 283Currently supported arguments: 284 285=over 4 286 287=item hash 288 289Return rows hash references instead of array references. 290 291=item cookie 292 293A cookie to pass to this query. This is passed back unchanged to the 294destination state in C<$_[ARG1]>. Can be any scalar (including 295references, and even POE postbacks, so be careful!). You can use this 296as an identifier if you have one destination state handling multiple 297different queries or sessions. 298 299=item delay 300 301Insert a 1ms delay between each row of output. 302 303I know what you're thinking: "WHY would you want to slow down query 304responses?!?!?" It has to do with CONCURRENCY. When a response 305(finally) comes in from the agent after running the query, it floods 306the input channel with response data. This has the effect of 307monopolizing POE's attention, so that any other handles (network 308sockets, pipes, file descriptors) keep getting pushed further back on 309the queue, and to all other processes EXCEPT the agent, your POE 310program looks hung for the amount of time it takes to process all of 311the incoming query data. 312 313So, we insert 1ms of time via Time::HiRes's C<usleep> function. In 314human terms, this is essentially negligible. But it is just enough 315time to allow competing handles (sockets, files) to trigger 316C<select()>, and get handled by the POE::Kernel, in situations where 317concurrency has priority over transfer rate. 318 319Naturally, the Time::HiRes module is required for this functionality. 320If Time::HiRes is not installed, the delay is ignored. 321 322=item group 323 324Sends the return event back when C<group> rows are retrieved from the 325database, to avoid event spam when selecting lots of rows. NB: using 326group means that C<$row> will be an arrayref of rows, not just a single 327row. 328 329=back 330 331=item $session, $state 332 333These parameters indicate the POE state that is to receive the data 334returned from the database. The state indicated will receive the data 335in its C<$_[ARG0]> parameter. I<PLEASE> make sure this is a valid 336state, otherwise you will spend a LOT of time banging your head 337against the wall wondering where your query data is. 338 339=item @parameters 340 341These are any parameters your query requires. B<WARNING:> You must 342supply exactly as many parameters as your query has placeholders! 343This means that if your query has NO placeholders, then you should 344pass NO extra parameters to C<query>. 345 346Suggestions to improve this syntax are welcome. 347 348=back 349 350=cut 351 352# }}} POD 353 354sub query { 355 my ($self, $query, $package, $state, @rest) = @_; 356 my $options = {}; 357 358 if (ref $package) { 359 unless (ref $package eq 'HASH') { 360 carp "Options has must be a HASH reference"; 361 } 362 $options = $package; 363 364 # this shifts the first element off of @rest and puts it into 365 # $state 366 ($package, $state) = ($state, shift @rest); 367 } 368 369 # warn "QD: Running $query"; 370 371 my $agent = $self->{helper}->next; 372 my $input = { query => $query, 373 package => $package, state => $state, 374 params => \@rest, 375 delay => 0, 376 id => "_", 377 %$options, 378 }; 379 380 $self->{pending_query_count}++; 381 if ($self->{active_query_count} < $self->{count} ) { 382 383 $input->{id} = $agent->ID; 384 $self->{cookies}[$input->{id}] = delete $input->{cookie}; 385 $agent->put( $input ); 386 $self->{active_query_count}++; 387 $self->{group_cache}[$input->{id}] = []; 388 389 } else { 390 push @{$self->{pending_queries}}, $input; 391 } 392 393 $self->debug 394 && warn sprintf("QA:(#%s) %d pending: %s => %s, return %d rows at once\n", 395 $input->{id}, $self->{pending_query_count}, 396 $input->{query}, 397 "$input->{package}::$input->{state}", 398 $input->{group} || 1, 399 ); 400 401} 402 403# }}} query 404 405#======================================================================================== 406# {{{ shutdown 407 408=head2 finish() 409 410The C<finish()> method tells DBIAgent that the program is finished 411sending queries. DBIAgent will shut its helpers down gracefully after 412they complete any pending queries. If there are no pending queries, 413the DBIAgent will shut down immediately. 414 415=cut 416 417sub finish { 418 my $self = shift; 419 420 $self->{finish} = 1; 421 422 unless ($self->{pending_query_count}) { 423 $self->debug and carp "QA: finish() called without pending queries. Shutting down now."; 424 $self->{helper}->exit_all(); 425 } 426 else { 427 $self->debug && carp "QA: Setting finish flag for later.\n"; 428 } 429} 430 431# }}} shutdown 432 433#======================================================================================== 434 435# {{{ STATES 436 437# {{{ _start 438 439sub _start { 440 my ($self, $kernel, $heap, $dsn, $queries) = @_[OBJECT, KERNEL, HEAP, ARG0, ARG1]; 441 442 $self->debug && warn __PACKAGE__ . " received _start.\n"; 443 444 # make this session accessible to the others. 445 #$kernel->alias_set( 'qa' ); 446 447 my $queue = POE::Component::DBIAgent::Queue->new(); 448 $self->{filter} = POE::Filter::Reference->new(); 449 450 ## Input and output from the children will be line oriented 451 foreach (1..$self->{count}) { 452 my $helper = POE::Wheel::Run->new( 453 Program => sub { 454 POE::Component::DBIAgent::Helper->run($self->{dsn}, $self->{queries}); 455 }, 456 StdoutEvent => 'db_reply', 457 StderrEvent => 'remote_stderr', 458 ErrorEvent => 'error', 459 #StdinFilter => POE::Filter::Line->new(), 460 StdinFilter => POE::Filter::Reference->new(), 461 StdoutFilter => POE::Filter::Reference->new(), 462 ) 463 or warn "Can't create new Wheel::Run: $!\n"; 464 $self->debug && warn __PACKAGE__, " Started db helper pid ", $helper->PID, " wheel ", $helper->ID, "\n"; 465 $queue->add($helper); 466 } 467 468 $self->{helper} = $queue; 469 470} 471 472# }}} _start 473# {{{ _stop 474 475sub _stop { 476 my ($self, $heap) = @_[OBJECT, HEAP]; 477 478 $self->{helper}->kill_all(); 479 480 # Oracle clients don't like to TERMinate sometimes. 481 $self->{helper}->kill_all(9); 482 $self->debug && warn __PACKAGE__ . " has stopped.\n"; 483 484} 485 486# }}} _stop 487 488# {{{ db_reply 489 490sub db_reply { 491 my ($kernel, $self, $heap, $input) = @_[KERNEL, OBJECT, HEAP, ARG0]; 492 493 # Parse the "receiving state" and dispatch the input line to that state. 494 495 # not needed for Filter::Reference 496 my ($package, $state, $data, $cookie, $group); 497 $package = $input->{package}; 498 $state = $input->{state}; 499 $data = $input->{data}; 500 $group = $input->{group} || 0; 501 # change so cookies are no longer sent over the reference channel 502 $cookie = $self->{cookies}[$input->{id}]; 503 504 unless (ref $data or $data eq 'EOF') { 505 warn "QA: Got $data\n"; 506 } 507 # $self->debug && $self->debug && warn "QA: received db_reply for $package => $state\n"; 508 509 unless (defined $data) { 510 $self->debug && warn "QA: Empty input value.\n"; 511 return; 512 } 513 514 if ($data eq 'EOF') { 515 # $self->debug && warn "QA: ${package}::${state} (#$input->{id}): EOF\n"; 516 $self->{pending_query_count}--; 517 $self->{active_query_count}--; 518 519 $self->debug 520 && warn sprintf("QA:(#%s) %d pending: EOF => %s\n", 521 $input->{id}, $self->{pending_query_count}, 522 "$input->{package}::$input->{state}"); 523 524 # If this was the last query to go, and we've been requested 525 # to finish, then turn out the lights. 526 unless ($self->{pending_query_count}) { 527 if ($self->{finish}) { 528 $self->debug and warn "QA: Last query done, and finish flag set. Shutting down.\n"; 529 $self->{helper}->exit_all(); 530 } 531 } 532 elsif ($self->debug and $self->{pending_query_count} < 0) { 533 die "QA: Pending query count went negative (should never do that)"; 534 } 535 536 # place this agent at the front of the queue, for next query 537 $self->{helper}->make_next($input->{id}); 538 539 if ( $self->{pending_queries} and 540 @{$self->{pending_queries}} and 541 $self->{active_query_count} < $self->{count} 542 ) { 543 544 my $input = shift @{$self->{pending_queries}}; 545 my $agent = $self->{helper}->next; 546 547 $input->{id} = $agent->ID; 548 $self->{cookies}[$input->{id}] = delete $input->{cookie}; 549 $agent->put( $input ); 550 $self->{active_query_count}++; 551 552 $self->debug && 553 warn sprintf("QA:(#%s) %d pending: %s => %s\n", 554 $input->{id}, $self->{pending_query_count}, 555 $input->{query}, 556 "$input->{package}::$input->{state}" 557 ); 558 559 } 560 } 561 if ($group) { 562 push @{ $self->{group_cache}[$input->{id}] }, $data; 563 if (scalar @{ $self->{group_cache}[$input->{id}] } == $group || $data eq 'EOF') { 564 $kernel->post($package => $state => $self->{group_cache}[$input->{id}], $cookie); 565 $self->{group_cache}[$input->{id}] = []; 566 } 567 } else { 568 $kernel->post($package => $state => $data => $cookie); 569 } 570 571 572} 573 574# }}} db_reply 575 576# {{{ remote_stderr 577 578sub remote_stderr { 579 my ($self, $kernel, $operation, $errnum, $errstr, $wheel_id, $data) = @_[OBJECT, KERNEL, ARG0..ARG4]; 580 581 $self->debug && warn defined $errstr ? "$operation: $errstr\n" : "$operation\n"; 582 583 $kernel->post(@{$self->{errorstate}}, $operation, $errstr, $wheel_id) if defined $self->{errorstate}; 584} 585 586# }}} remote_stderr 587# {{{ error 588 589sub error { 590 my ($self, $operation, $errnum, $errstr, $wheel_id) = @_[OBJECT, ARG0..ARG3]; 591 592 $errstr = "child process closed connection" unless $errnum; 593 $self->debug and warn "error: Wheel $wheel_id generated $operation error $errnum: $errstr\n"; 594 595 $self->{helper}->remove_by_wheelid($wheel_id); 596} 597 598# }}} error 599 600# }}} STATES 601 6021; 603 604__END__ 605 606=head1 NOTES 607 608=over 609 610=item * 611 612Error handling is practically non-existent. 613 614=item * 615 616The calling syntax is still pretty weak... but improving. We may 617eventually add an optional attributes hash so that each query can be 618called with its own individual characteristics. 619 620=item * 621 622I might eventually want to support returning hashrefs, if there is any 623demand. 624 625=item * 626 627Every query is prepared at Helper startup. This could potentially be 628pretty expensive. Perhaps a cached or deferred loading might be 629better? This is considering that not every helper is going to run 630every query, especially if you have a lot of miscellaneous queries. 631 632=back 633 634Suggestions welcome! Diffs I<more> welcome! :-) 635 636=head1 AUTHOR 637 638This module has been fine-tuned and packaged by Rob Bloodgood 639E<lt>robb@empire2.comE<gt>. However, most of the queuing code 640originated with Fletch E<lt>fletch@phydeaux.orgE<gt>, either directly 641or via his ideas. Thank you for making this module a reality, Fletch! 642 643However, I own all of the bugs. 644 645This module is free software; you may redistribute it and/or modify it 646under the same terms as Perl itself. 647 648=cut 649