1package DBD::Multi; 2# $Id: Multi.pm,v 1.24 2010/09/05 20:28:21 wright Exp $ 3use strict; 4 5use DBI; 6DBI->setup_driver('DBD::Multi'); 7 8use vars qw[$VERSION $err $errstr $sqlstate $drh]; 9 10$VERSION = '0.16'; 11 12$err = 0; # DBI::err 13$errstr = ""; # DBI::errstr 14$sqlstate = ""; # DBI::state 15$drh = undef; 16 17sub driver { 18 return $drh if $drh; 19 my($class, $attr) = @_; 20 $class .= '::dr'; 21 22 $drh = DBI::_new_drh($class, { 23 Name => 'Multi', 24 Version => $VERSION, 25 Err => \$DBD::Multi::err, 26 Errstr => \$DBD::Multi::errstr, 27 State => \$DBD::Multi::sqlstate, 28 Attribution => 'DBD::Multi, pair Networks Inc.', 29 }); 30 # This doesn't work without formal registration with DBI 31 #DBD::Multi::db->install_method('multi_do_all'); 32 return $drh; 33} 34 35####################################################################### 36package DBD::Multi::dr; 37use strict; 38 39$DBD::Multi::dr::imp_data_size = 0; 40use DBD::File; 41 42sub DESTROY { shift->STORE(Active => 0) } 43 44sub connect { 45 my($drh, $dbname, $user, $auth, $attr) = @_; 46 my $dbh = DBI::_new_dbh( 47 $drh => { 48 Name => $dbname, 49 USER => $user, 50 CURRENT_USER => $user, 51 }, 52 ); 53 my @dsns = $attr->{dsns} && ref($attr->{dsns}) eq 'ARRAY' 54 ? @{$attr->{dsns}} 55 : (); 56 57 if ( $dbname =~ /dsn=(.*)/ ) { 58 push @dsns, ( -1, [$1, $user, $auth] ); 59 } 60 61 my $handler = DBD::Multi::Handler->new({ 62 dsources => [ @dsns ], 63 }); 64 $handler->failed_max($attr->{failed_max}) 65 if exists $attr->{failed_max}; 66 $handler->failed_expire($attr->{failed_expire}) 67 if exists $attr->{failed_expire}; 68 69 $dbh->STORE(_handler => $handler); 70 $dbh->STORE(handler => $handler); # temporary 71 $drh->{_handler} = $handler; 72 $dbh->STORE(Active => 1); 73 return $dbh; 74} 75 76sub data_sources { shift->FETCH('_handler')->all_sources } 77 78####################################################################### 79package DBD::Multi::db; 80use strict; 81 82$DBD::Multi::db::imp_data_size = 0; 83 84sub prepare { 85 my ($dbh, $statement, @attribs) = @_; 86 87 # create a 'blank' sth 88 my ($outer, $sth) = DBI::_new_sth($dbh, { Statement => $statement }); 89 90 my $handler = $dbh->FETCH('_handler'); 91 $sth->STORE(_handler => $handler); 92 93 my $_dbh = $handler->dbh; 94 my $_sth; 95 until ( $_sth ) { 96 $_sth = $_dbh->prepare($statement, @attribs); 97 unless ( $_sth ) { 98 $handler->dbh_failed; 99 $_dbh = $handler->dbh; 100 } 101 } 102 103 $sth->STORE(NUM_OF_PARAMS => $_sth->FETCH('NUM_OF_PARAMS')); 104 $sth->STORE(_dbh => $_dbh); 105 $sth->STORE(_sth => $_sth); 106 107 return $outer; 108} 109 110sub disconnect { 111 my ($dbh) = @_; 112 $dbh->STORE(Active => 0); 113 1; 114} 115 116sub commit { 117 my ($dbh) = @_; 118 if ( $dbh->FETCH('Active') ) { 119 return $dbh->FETCH('_dbh')->commit if $dbh->FETCH('_dbh'); 120 } 121 return; 122} 123 124sub rollback { 125 my ($dbh) = @_; 126 if ( $dbh->FETCH('Active') ) { 127 return $dbh->FETCH('_dbh')->rollback if $dbh->FETCH('_dbh'); 128 } 129 return; 130} 131 132 133sub STORE { 134 my ($self, $attr, $val) = @_; 135 $self->{$attr} = $val; 136} 137 138sub DESTROY { shift->disconnect } 139 140####################################################################### 141package DBD::Multi::st; 142use strict; 143 144$DBD::Multi::st::imp_data_size = 0; 145 146use vars qw[@METHODS @FIELDS]; 147@METHODS = qw[ 148 bind_param 149 bind_param_inout 150 bind_param_array 151 execute_array 152 execute_for_fetch 153 fetch 154 fetchrow_arrayref 155 fetchrow_array 156 fetchrow_hashref 157 fetchall_arrayref 158 fetchall_hashref 159 bind_col 160 bind_columns 161 dump_results 162]; 163 164@FIELDS = qw[ 165 NUM_OF_FIELDS 166 CursorName 167 ParamValues 168 RowsInCache 169]; 170 171sub execute { 172 my $sth = shift; 173 my $_sth = $sth->FETCH('_sth'); 174 my $params = @_ 175 ? $sth->{f_params} = [ @_ ] 176 : $sth->{f_params}; 177 178 $sth->finish if $sth->FETCH('Active'); 179 $sth->{Active} = 1; 180 my $rc = $_sth->execute(@{$params}); 181 182 for my $field ( @FIELDS ) { 183 my $value = $_sth->FETCH($field); 184 $sth->STORE($field => $value) 185 unless ! defined $value 186 || defined $sth->FETCH($field); 187 } 188 189 return $rc; 190} 191 192sub FETCH { 193 my ($sth, $attrib) = @_; 194 $sth->{'_sth'}->FETCH($attrib) || $sth->{$attrib}; 195} 196 197sub STORE { 198 my ($self, $attr, $val) = @_; 199 $self->{$attr} = $val; 200} 201 202sub rows { shift->FETCH('_sth')->rows } 203 204sub finish { 205 my ($sth) = @_; 206 $sth->STORE(Active => 0); 207 return $sth->FETCH('_sth')->finish; 208} 209 210foreach my $method ( @METHODS ) { 211 no strict; 212 *{$method} = sub { shift->FETCH('_sth')->$method(@_) }; 213} 214 215####################################################################### 216package DBD::Multi::Handler; 217use strict; 218 219use base qw[Class::Accessor::Fast]; 220use Sys::SigAction qw(timeout_call); 221use List::Util qw(shuffle); 222 223=begin ImplementationNotes 224 225dsources - This thing changes from an arrayref to a hashref during construction. :( 226 227 Initially, when data is passed in during construction, it's an arrayref 228 containing the 'dsns' param from the user's connect() call. 229 230 Later, when _configure_dsources gets called, it turns into a multi-dimension 231 hashref: 232 233 $dsources->{$pri}->{$dsource_id} = 1; 234 235 The first key is the priority number, the second key is the data source index 236 number. The value is always just a true value. 237 238nextid - A counter. Stores the index number of the next data source to be added. 239 240all_dsources - A hashref. Maps index number to the connect data. 241 242current_dsource - The most recent chosen datasource index number. 243 244used - A hashref. Keys are index numbers. Values are true when the datasource 245has been previously assigned and we want to prefer other datasources of the 246same priority (for round-robin load distribution). 247 248failed - A hashref. Keys are index numbers. Values are counters indicating 249how many times the data source has failed. 250 251failed_last - A hashref. Keys are index number. Values are unix timestamp 252indicating the most recent time a data source failed. 253 254failed_max - A scalar value. Number of times a datasource may fail before we 255stop trying it. 256 257failed_expire - A scalar value. Number of seconds since we stopped trying a 258datasource before we'll try it again. 259 260timeout - A scalar value. Number of seconds we try to connect to a datasource 261before giving up. 262 263=end ImplementationNotes 264 265=cut 266 267__PACKAGE__->mk_accessors(qw[ 268 dsources 269 nextid 270 all_dsources 271 current_dsource 272 used 273 failed 274 failed_last 275 failed_max 276 failed_expire 277 timeout 278]); 279 280sub new { 281 my ($class, $args) = @_; 282 my $self = $class->SUPER::new($args); 283 $self->nextid(0) unless defined $self->nextid; 284 $self->all_dsources({}); 285 $self->used({}); 286 $self->failed({}); 287 $self->failed_last({}); 288 $self->failed_max(3) unless defined $self->failed_max; 289 $self->failed_expire(60*5) unless defined $self->failed_expire; 290 $self->timeout( 5 ) unless defined $self->timeout; 291 $self->_configure_dsources; 292 return $self; 293} 294 295sub all_sources { 296 my ($self) = @_; 297 return values %{$self->all_dsources}; 298} 299 300sub add_to_pri { 301 my ($self, $pri, $dsource) = @_; 302 my $dsource_id = $self->nextid; 303 my $dsources = $self->dsources; 304 my $all = $self->all_dsources; 305 306 $all->{$dsource_id} = $dsource; 307 $dsources->{$pri}->{$dsource_id} = 1; 308 309 $self->nextid($dsource_id + 1); 310} 311 312sub dbh { 313 my $self = shift; 314 my $dbh = $self->_connect_dsource; 315 return $dbh if $dbh; 316 $self->dbh_failed; 317 $self->dbh; 318} 319 320sub dbh_failed { 321 my ($self) = @_; 322 323 my $current_dsource = $self->current_dsource; 324 $self->failed->{$current_dsource}++; 325 $self->failed_last->{$current_dsource} = time; 326} 327 328sub _purge_old_failures { 329 my ($self) = @_; 330 my $now = time; 331 my @all = keys %{$self->all_dsources}; 332 333 foreach my $dsource ( @all ) { 334 next unless $self->failed->{$dsource}; 335 if ( ($now - $self->failed_last->{$dsource}) > $self->failed_expire ) { 336 delete $self->failed->{$dsource}; 337 delete $self->failed_last->{$dsource}; 338 } 339 } 340} 341 342sub _pick_dsource { 343 my ($self) = @_; 344 $self->_purge_old_failures; 345 my $dsources = $self->dsources; 346 my @pri = sort { $a <=> $b } keys %{$dsources}; 347 348 foreach my $pri ( @pri ) { 349 my $dsource = $self->_pick_pri_dsource($dsources->{$pri}); 350 if ( defined $dsource ) { 351 $self->current_dsource($dsource); 352 return; 353 } 354 } 355 356 $self->used({}); 357 return $self->_pick_dsource 358 if (grep {$self->failed->{$_} >= $self->failed_max} keys(%{$self->failed})) < keys(%{$self->all_dsources}); 359 die("All data sources failed!"); 360} 361 362sub _pick_pri_dsource { 363 my ($self, $dsources) = @_; 364 my @dsources = sort { $a <=> $b } keys %{$dsources}; 365 my @used = grep { exists $self->used->{$_} } @dsources; 366 my @failed = grep { exists($self->failed->{$_}) && $self->failed->{$_} >= $self->failed_max } @dsources; 367 368 # We've used them all and they all failed. Escallate. 369 return if @used == @dsources && @failed == @dsources; 370 371 # We've used them all but some are good. Purge and reuse. 372 delete @{$self->used}{@dsources} if @used == @dsources; 373 374 foreach my $dsource ( shuffle @dsources ) { 375 next if $self->failed->{$dsource} 376 && $self->failed->{$dsource} >= $self->failed_max; 377 next if $self->used->{$dsource}; 378 379 $self->used->{$dsource} = 1; 380 return $dsource; 381 } 382 return; 383} 384 385sub _configure_dsources { 386 my ($self) = @_; 387 my $dsources = $self->dsources; 388 $self->dsources({}); 389 390 while ( my $pri = shift @{$dsources} ) { 391 my $dsource = shift @{$dsources} or last; 392 $self->add_to_pri($pri => $dsource); 393 } 394} 395 396sub _connect_dsource { 397 my ($self, $dsource) = @_; 398 unless ( $dsource ) { 399 $self->_pick_dsource; 400 $dsource = $self->all_dsources->{$self->current_dsource}; 401 } 402 403 # Support ready-made handles 404 return $dsource if UNIVERSAL::isa($dsource, 'DBI::db'); 405 406 # Support code-refs which return handles 407 if (ref $dsource eq 'CODE') { 408 my $handle = $dsource->(); 409 return $handle if UNIVERSAL::isa($handle, 'DBI::db'); 410 return undef; # Connect by coderef failed. 411 } 412 413 my $dbh; 414 local $ENV{DBI_AUTOPROXY}; 415 if (timeout_call( $self->timeout, sub { $dbh = DBI->connect_cached(@{$dsource}) } )) { 416 #warn "Timeout[", $self->current_dsource, "] at ", time, "\n"; 417 } 418 return $dbh; 419} 420 421sub connect_dsource { 422 my ($self, $dsource) = @_; 423 $self->_connect_dsource($dsource); 424} 425 426sub multi_do_all { 427 my ($self, $code) = @_; 428 429 my @all = values %{$self->all_dsources}; 430 431 foreach my $source ( @all ) { 432 my $dbh = $self->connect_dsource($source); 433 next unless $dbh; 434 if ( $dbh->{handler} ) { 435 $dbh->{handler}->multi_do_all($code, $source); 436 next; 437 } 438 $code->($dbh); 439 } 440} 441 4421; 443__END__ 444 445=head1 NAME 446 447DBD::Multi - Manage Multiple Data Sources with Failover and Load Balancing 448 449=head1 SYNOPSIS 450 451 use DBI; 452 453 my $other_dbh = DBI->connect(...); 454 455 my $dbh = DBI->connect( 'dbi:Multi:', undef, undef, { 456 dsns => [ # in priority order 457 10 => [ 'dbi:SQLite:read_one.db', '', '' ], 458 10 => [ 'dbi:SQLite:read_two.db', '', '' ], 459 20 => [ 'dbi:SQLite:master.db', '', '' ], 460 30 => $other_dbh, 461 40 => sub { DBI->connect }, 462 ], 463 # optional 464 failed_max => 1, # short credibility 465 failed_expire => 60*60, # long memory 466 timeout => 10, # time out connection attempts after 10 seconds. 467 }); 468 469=head1 DESCRIPTION 470 471This software manages multiple database connections for failovers and also 472simple load balancing. It acts as a proxy between your code and your database 473connections, transparently choosing a connection for each query, based on your 474preferences and present availability of the DB server. 475 476This module is intended for read-only operations (where some other application 477is being used to handle replication). 478 479This software does not prevent write operations from being executed. This is 480left up to the user. See L<SUGGESTED USES> below for ideas. 481 482The interface is nearly the same as other DBI drivers with one notable 483exception. 484 485=head2 Configuring DSNs 486 487Specify an attribute to the C<connect()> constructor, C<dsns>. This is a list 488of DSNs to configure. The configuration is given in pairs. First comes the 489priority of the DSN. Second is the DSN. 490 491The priorities specify which connections should be used first (lowest to 492highest). As long as the lowest priority connection is responding, the higher 493priority connections will never be used. If multiple connections have the same 494priority, then one connection will be chosen randomly for each operation. Note 495that the random DB is chosen when the statement is prepared. Therefore 496executing multiple queries on the same prepared statement handle will always 497run on the same connection. 498 499The second parameter can a DBI object, a code ref which returns a DBI object, 500or a list of parameters to pass to the DBI C<connect()> instructor. If a set 501of parameters or a code ref is given, then DBD::Multi will be able to attempt 502re-connect in the event that the connection is lost. If a DBI object is used, 503the DBD::Multi will give up permanently once that connection is lost. 504 505These connections are lazy loaded, meaning they aren't made until they are 506actually used. 507 508=head2 Configuring Failures 509 510By default, after a data source fails three times, it will not be tried again 511for 5 minutes. After that period, the data source will be tried again for 512future requests until it reaches its three failure limit (the cycle repeats 513forever). 514 515To change the maximum number of failures allowed before a data source is 516deemed failed, set the C<failed_max> parameter. To change the amount of 517time we remember a data source as being failed, set the C<failed_expire> 518parameter in seconds. 519 520=head2 Timing out connections. 521 522By default, if you attempt to connect to an IP that isn't answering, DBI will 523hang for a very long period of time. This behavior is not desirable in a 524multi database setup. Instead, it is better to give up on slow connections 525and move on to other databases quickly. 526 527DBD::Multi will give up on connection attempts after 5 seconds and then try 528another connection. You may set the C<timeout> parameter to change the 529timeout time, or set it to 0 to disable the timeout feature completely. 530 531=head1 SUGGESTED USES 532 533Here are some ideas on how to use this module effectively and safely. 534 535It is important to remember that C<DBD::Multi> is not intended for read-write 536operations. One suggestion to prevent accidental write operations is to make 537sure that the user you are connecting to the databases with has privileges 538sufficiently restricted to prevent updates. 539 540Read-write operations should happen through a separate database handle that 541will somehow trigger replication to all of your databases. For example, your 542read-write handle might be connected to the master server that replicates 543itself to all of the subordinate servers. 544 545Read-only database calls within your application would be updated to explicitly 546use the read-only (DBD::Multi) handle. It is not necessary to find every single 547call that can be load balanced, since they can safely be sent through the 548read/write handle as well. 549 550=head1 TODO 551 552There really isn't much of a TODO list for this module at this time. Feel free 553to submit a bug report to rt.cpan.org if you think there is a feature missing. 554 555Although there is some code intended for read/write operations, this should be 556considered not supported and not actively developed at this time. The actual 557read/write code remains un-documented because in the event that I ever do 558decide to work on supporting read/write operations, the API is not guaranteed 559to stay the same. The focus of this module is presently limited to read-only 560operations. 561 562=head1 TESTING 563 564DBD::Multi has it's own suite of regression tests. But, suppose you want to 565verify that you can slip DBD::Multi into whatever application you already have 566written without breaking anything. 567 568Thanks to a feature of DBI, you can regression test DBD::Multi using any 569existing tests that already use DBI without having to update any of your code. 570Simply set the environment variable DBI_AUTOPROXY to 'dbi:Multi:' and then run 571your tests. DBD::Multi should act as a silent pipe between your application 572and whatever database driver you were previously using. This will help you 573verify that you aren't currently using some feature of the DBI that breaks 574DBD::Multi (If you are, please do me a favor and submit a bug report so I can 575fix it). 576 577=head1 SEE ALSO 578 579L<CGI::Application::Plugin::DBH> - A plugin for the L<CGI::Application> framework 580which makes it easy to support two database handles, and also supports lazy-loading. 581 582L<DBD::Multiplex>, L<DBIx::HA> - Two modules similar to DBD::Multi, but with 583slightly different objectives. 584 585L<DBI>, L<perl> - You should probably already know about these before using 586this module. 587 588=head1 AUTHOR 589 590Initially written by Casey West and Dan Wright for pair Networks, Inc. 591(www.pair.com) 592 593Maintained by Dan Wright. <F<DWRIGHT@CPAN.ORG>>. 594 595=cut 596 597