1package DBD::Multi;
2# $Id: Multi.pm,v 1.24 2010/09/05 20:28:21 wright Exp $
3use strict;
4
5use DBI;
6DBI->setup_driver('DBD::Multi');
7
8use vars qw[$VERSION $err $errstr $sqlstate $drh];
9
10$VERSION   = '0.16';
11
12$err       = 0;        # DBI::err
13$errstr    = "";       # DBI::errstr
14$sqlstate  = "";       # DBI::state
15$drh       = undef;
16
17sub driver {
18    return $drh if $drh;
19    my($class, $attr) = @_;
20    $class .= '::dr';
21
22    $drh = DBI::_new_drh($class, {
23        Name        => 'Multi',
24        Version     => $VERSION,
25        Err         => \$DBD::Multi::err,
26        Errstr      => \$DBD::Multi::errstr,
27        State       => \$DBD::Multi::sqlstate,
28        Attribution => 'DBD::Multi, pair Networks Inc.',
29    });
30    # This doesn't work without formal registration with DBI
31    #DBD::Multi::db->install_method('multi_do_all');
32    return $drh;
33}
34
35#######################################################################
36package DBD::Multi::dr;
37use strict;
38
39$DBD::Multi::dr::imp_data_size = 0;
40use DBD::File;
41
42sub DESTROY { shift->STORE(Active => 0) }
43
44sub connect {
45    my($drh, $dbname, $user, $auth, $attr) = @_;
46    my $dbh = DBI::_new_dbh(
47      $drh => {
48               Name         => $dbname,
49               USER         => $user,
50               CURRENT_USER => $user,
51              },
52    );
53    my @dsns =   $attr->{dsns} && ref($attr->{dsns}) eq 'ARRAY'
54               ? @{$attr->{dsns}}
55               : ();
56
57    if ( $dbname =~ /dsn=(.*)/ ) {
58        push @dsns, ( -1, [$1, $user, $auth] );
59    }
60
61    my $handler = DBD::Multi::Handler->new({
62        dsources => [ @dsns ],
63    });
64    $handler->failed_max($attr->{failed_max})
65      if exists $attr->{failed_max};
66    $handler->failed_expire($attr->{failed_expire})
67      if exists $attr->{failed_expire};
68
69    $dbh->STORE(_handler => $handler);
70    $dbh->STORE(handler => $handler); # temporary
71    $drh->{_handler} = $handler;
72    $dbh->STORE(Active => 1);
73    return $dbh;
74}
75
76sub data_sources { shift->FETCH('_handler')->all_sources }
77
78#######################################################################
79package DBD::Multi::db;
80use strict;
81
82$DBD::Multi::db::imp_data_size = 0;
83
84sub prepare {
85    my ($dbh, $statement, @attribs) = @_;
86
87    # create a 'blank' sth
88    my ($outer, $sth) = DBI::_new_sth($dbh, { Statement => $statement });
89
90    my $handler = $dbh->FETCH('_handler');
91    $sth->STORE(_handler => $handler);
92
93    my $_dbh = $handler->dbh;
94    my $_sth;
95    until ( $_sth ) {
96        $_sth = $_dbh->prepare($statement, @attribs);
97        unless ( $_sth ) {
98            $handler->dbh_failed;
99            $_dbh = $handler->dbh;
100        }
101    }
102
103    $sth->STORE(NUM_OF_PARAMS => $_sth->FETCH('NUM_OF_PARAMS'));
104    $sth->STORE(_dbh => $_dbh);
105    $sth->STORE(_sth => $_sth);
106
107    return $outer;
108}
109
110sub disconnect {
111    my ($dbh) = @_;
112    $dbh->STORE(Active => 0);
113    1;
114}
115
116sub commit {
117    my ($dbh) = @_;
118    if ( $dbh->FETCH('Active') ) {
119        return $dbh->FETCH('_dbh')->commit if $dbh->FETCH('_dbh');
120    }
121    return;
122}
123
124sub rollback {
125    my ($dbh) = @_;
126    if ( $dbh->FETCH('Active') ) {
127        return $dbh->FETCH('_dbh')->rollback if $dbh->FETCH('_dbh');
128    }
129    return;
130}
131
132
133sub STORE {
134    my ($self, $attr, $val) = @_;
135    $self->{$attr} = $val;
136}
137
138sub DESTROY { shift->disconnect }
139
140#######################################################################
141package DBD::Multi::st;
142use strict;
143
144$DBD::Multi::st::imp_data_size = 0;
145
146use vars qw[@METHODS @FIELDS];
147@METHODS = qw[
148    bind_param
149    bind_param_inout
150    bind_param_array
151    execute_array
152    execute_for_fetch
153    fetch
154    fetchrow_arrayref
155    fetchrow_array
156    fetchrow_hashref
157    fetchall_arrayref
158    fetchall_hashref
159    bind_col
160    bind_columns
161    dump_results
162];
163
164@FIELDS = qw[
165    NUM_OF_FIELDS
166    CursorName
167    ParamValues
168    RowsInCache
169];
170
171sub execute {
172    my $sth  = shift;
173    my $_sth = $sth->FETCH('_sth');
174    my $params =   @_
175                 ? $sth->{f_params} = [ @_ ]
176                 : $sth->{f_params};
177
178    $sth->finish if $sth->FETCH('Active');
179    $sth->{Active} = 1;
180    my $rc = $_sth->execute(@{$params});
181
182    for my $field ( @FIELDS ) {
183        my $value = $_sth->FETCH($field);
184        $sth->STORE($field => $value)
185          unless    ! defined $value
186                 || defined $sth->FETCH($field);
187    }
188
189    return $rc;
190}
191
192sub FETCH {
193    my ($sth, $attrib) = @_;
194    $sth->{'_sth'}->FETCH($attrib) || $sth->{$attrib};
195}
196
197sub STORE {
198    my ($self, $attr, $val) = @_;
199    $self->{$attr} = $val;
200}
201
202sub rows { shift->FETCH('_sth')->rows }
203
204sub finish {
205    my ($sth) = @_;
206    $sth->STORE(Active => 0);
207    return $sth->FETCH('_sth')->finish;
208}
209
210foreach my $method ( @METHODS ) {
211    no strict;
212    *{$method} = sub { shift->FETCH('_sth')->$method(@_) };
213}
214
215#######################################################################
216package DBD::Multi::Handler;
217use strict;
218
219use base qw[Class::Accessor::Fast];
220use Sys::SigAction qw(timeout_call);
221use List::Util qw(shuffle);
222
223=begin ImplementationNotes
224
225dsources - This thing changes from an arrayref to a hashref during construction.  :(
226
227  Initially, when data is passed in during construction, it's an arrayref
228  containing the 'dsns' param from the user's connect() call.
229
230  Later, when _configure_dsources gets called, it turns into a multi-dimension
231  hashref:
232
233       $dsources->{$pri}->{$dsource_id} = 1;
234
235  The first key is the priority number, the second key is the data source index
236  number.  The value is always just a true value.
237
238nextid - A counter.  Stores the index number of the next data source to be added.
239
240all_dsources - A hashref.  Maps index number to the connect data.
241
242current_dsource - The most recent chosen datasource index number.
243
244used - A hashref.  Keys are index numbers.  Values are true when the datasource
245has been previously assigned and we want to prefer other datasources of the
246same priority (for round-robin load distribution).
247
248failed - A hashref.   Keys are index numbers.   Values are counters indicating
249how many times the data source has failed.
250
251failed_last - A hashref.   Keys are index number.   Values are unix timestamp
252indicating the most recent time a data source failed.
253
254failed_max - A scalar value.   Number of times a datasource may fail before we
255stop trying it.
256
257failed_expire - A scalar value.   Number of seconds since we stopped trying a
258datasource before we'll try it again.
259
260timeout - A scalar value.   Number of seconds we try to connect to a datasource
261before giving up.
262
263=end ImplementationNotes
264
265=cut
266
267__PACKAGE__->mk_accessors(qw[
268    dsources
269    nextid
270    all_dsources
271    current_dsource
272    used
273    failed
274    failed_last
275    failed_max
276    failed_expire
277    timeout
278]);
279
280sub new {
281    my ($class, $args) = @_;
282    my $self     = $class->SUPER::new($args);
283    $self->nextid(0) unless defined $self->nextid;
284    $self->all_dsources({});
285    $self->used({});
286    $self->failed({});
287    $self->failed_last({});
288    $self->failed_max(3) unless defined $self->failed_max;
289    $self->failed_expire(60*5) unless defined $self->failed_expire;
290    $self->timeout( 5 ) unless defined $self->timeout;
291    $self->_configure_dsources;
292    return $self;
293}
294
295sub all_sources {
296    my ($self) = @_;
297    return values %{$self->all_dsources};
298}
299
300sub add_to_pri {
301    my ($self, $pri, $dsource) = @_;
302    my $dsource_id = $self->nextid;
303    my $dsources   = $self->dsources;
304    my $all        = $self->all_dsources;
305
306    $all->{$dsource_id} = $dsource;
307    $dsources->{$pri}->{$dsource_id} = 1;
308
309    $self->nextid($dsource_id + 1);
310}
311
312sub dbh {
313    my $self = shift;
314    my $dbh = $self->_connect_dsource;
315    return $dbh if $dbh;
316    $self->dbh_failed;
317    $self->dbh;
318}
319
320sub dbh_failed {
321    my ($self) = @_;
322
323    my $current_dsource = $self->current_dsource;
324    $self->failed->{$current_dsource}++;
325    $self->failed_last->{$current_dsource} = time;
326}
327
328sub _purge_old_failures {
329    my ($self) = @_;
330    my $now = time;
331    my @all = keys %{$self->all_dsources};
332
333    foreach my $dsource ( @all ) {
334        next unless $self->failed->{$dsource};
335        if ( ($now - $self->failed_last->{$dsource}) > $self->failed_expire ) {
336            delete $self->failed->{$dsource};
337            delete $self->failed_last->{$dsource};
338        }
339    }
340}
341
342sub _pick_dsource {
343    my ($self) = @_;
344    $self->_purge_old_failures;
345    my $dsources = $self->dsources;
346    my @pri      = sort { $a <=> $b } keys %{$dsources};
347
348    foreach my $pri ( @pri ) {
349        my $dsource = $self->_pick_pri_dsource($dsources->{$pri});
350        if ( defined $dsource ) {
351            $self->current_dsource($dsource);
352            return;
353        }
354    }
355
356    $self->used({});
357    return $self->_pick_dsource
358      if (grep {$self->failed->{$_} >= $self->failed_max} keys(%{$self->failed})) < keys(%{$self->all_dsources});
359    die("All data sources failed!");
360}
361
362sub _pick_pri_dsource {
363    my ($self, $dsources) = @_;
364    my @dsources = sort { $a <=> $b } keys %{$dsources};
365    my @used     = grep { exists $self->used->{$_} } @dsources;
366    my @failed   = grep { exists($self->failed->{$_}) && $self->failed->{$_} >= $self->failed_max } @dsources;
367
368    # We've used them all and they all failed. Escallate.
369    return if @used == @dsources && @failed == @dsources;
370
371    # We've used them all but some are good. Purge and reuse.
372    delete @{$self->used}{@dsources} if @used == @dsources;
373
374    foreach my $dsource ( shuffle @dsources ) {
375        next if    $self->failed->{$dsource}
376                && $self->failed->{$dsource} >= $self->failed_max;
377        next if $self->used->{$dsource};
378
379        $self->used->{$dsource} = 1;
380        return $dsource;
381    }
382    return;
383}
384
385sub _configure_dsources {
386    my ($self) = @_;
387    my $dsources = $self->dsources;
388    $self->dsources({});
389
390    while ( my $pri = shift @{$dsources} ) {
391        my $dsource = shift @{$dsources} or last;
392        $self->add_to_pri($pri => $dsource);
393    }
394}
395
396sub _connect_dsource {
397    my ($self, $dsource) = @_;
398    unless ( $dsource ) {
399        $self->_pick_dsource;
400        $dsource = $self->all_dsources->{$self->current_dsource};
401    }
402
403    # Support ready-made handles
404    return $dsource if UNIVERSAL::isa($dsource, 'DBI::db');
405
406    # Support code-refs which return handles
407    if (ref $dsource eq 'CODE') {
408        my $handle = $dsource->();
409        return $handle if UNIVERSAL::isa($handle, 'DBI::db');
410        return undef; # Connect by coderef failed.
411    }
412
413    my $dbh;
414    local $ENV{DBI_AUTOPROXY};
415    if (timeout_call( $self->timeout, sub { $dbh = DBI->connect_cached(@{$dsource}) } )) {
416        #warn "Timeout[", $self->current_dsource, "] at ", time, "\n";
417    }
418    return $dbh;
419}
420
421sub connect_dsource {
422    my ($self, $dsource) = @_;
423    $self->_connect_dsource($dsource);
424}
425
426sub multi_do_all {
427    my ($self, $code) = @_;
428
429    my @all = values %{$self->all_dsources};
430
431    foreach my $source ( @all ) {
432        my $dbh = $self->connect_dsource($source);
433        next unless $dbh;
434        if ( $dbh->{handler} ) {
435            $dbh->{handler}->multi_do_all($code, $source);
436            next;
437        }
438        $code->($dbh);
439    }
440}
441
4421;
443__END__
444
445=head1 NAME
446
447DBD::Multi - Manage Multiple Data Sources with Failover and Load Balancing
448
449=head1 SYNOPSIS
450
451  use DBI;
452
453  my $other_dbh = DBI->connect(...);
454
455  my $dbh = DBI->connect( 'dbi:Multi:', undef, undef, {
456      dsns => [ # in priority order
457          10 => [ 'dbi:SQLite:read_one.db', '', '' ],
458          10 => [ 'dbi:SQLite:read_two.db', '', '' ],
459          20 => [ 'dbi:SQLite:master.db',   '', '' ],
460          30 => $other_dbh,
461          40 => sub {  DBI->connect },
462      ],
463      # optional
464      failed_max    => 1,     # short credibility
465      failed_expire => 60*60, # long memory
466      timeout       => 10,    # time out connection attempts after 10 seconds.
467  });
468
469=head1 DESCRIPTION
470
471This software manages multiple database connections for failovers and also
472simple load balancing.  It acts as a proxy between your code and your database
473connections, transparently choosing a connection for each query, based on your
474preferences and present availability of the DB server.
475
476This module is intended for read-only operations (where some other application
477is being used to handle replication).
478
479This software does not prevent write operations from being executed.  This is
480left up to the user. See L<SUGGESTED USES> below for ideas.
481
482The interface is nearly the same as other DBI drivers with one notable
483exception.
484
485=head2 Configuring DSNs
486
487Specify an attribute to the C<connect()> constructor, C<dsns>. This is a list
488of DSNs to configure. The configuration is given in pairs. First comes the
489priority of the DSN. Second is the DSN.
490
491The priorities specify which connections should be used first (lowest to
492highest).  As long as the lowest priority connection is responding, the higher
493priority connections will never be used.  If multiple connections have the same
494priority, then one connection will be chosen randomly for each operation.  Note
495that the random DB is chosen when the statement is prepared.   Therefore
496executing multiple queries on the same prepared statement handle will always
497run on the same connection.
498
499The second parameter can a DBI object, a code ref which returns a DBI object,
500or a list of parameters to pass to the DBI C<connect()> instructor.   If a set
501of parameters or a code ref is given, then DBD::Multi will be able to attempt
502re-connect in the event that the connection is lost.   If a DBI object is used,
503the DBD::Multi will give up permanently once that connection is lost.
504
505These connections are lazy loaded, meaning they aren't made until they are
506actually used.
507
508=head2 Configuring Failures
509
510By default, after a data source fails three times, it will not be tried again
511for 5 minutes.  After that period, the data source will be tried again for
512future requests until it reaches its three failure limit (the cycle repeats
513forever).
514
515To change the maximum number of failures allowed before a data source is
516deemed failed, set the C<failed_max> parameter. To change the amount of
517time we remember a data source as being failed, set the C<failed_expire>
518parameter in seconds.
519
520=head2 Timing out connections.
521
522By default, if you attempt to connect to an IP that isn't answering, DBI will
523hang for a very long period of time.   This behavior is not desirable in a
524multi database setup.   Instead, it is better to give up on slow connections
525and move on to other databases quickly.
526
527DBD::Multi will give up on connection attempts after 5 seconds and then try
528another connection.   You may set the C<timeout> parameter to change the
529timeout time, or set it to 0 to disable the timeout feature completely.
530
531=head1 SUGGESTED USES
532
533Here are some ideas on how to use this module effectively and safely.
534
535It is important to remember that C<DBD::Multi> is not intended for read-write
536operations.  One suggestion to prevent accidental write operations is to make
537sure that the user you are connecting to the databases with has privileges
538sufficiently restricted to prevent updates.
539
540Read-write operations should happen through a separate database handle that
541will somehow trigger replication to all of your databases.  For example, your
542read-write handle might be connected to the master server that replicates
543itself to all of the subordinate servers.
544
545Read-only database calls within your application would be updated to explicitly
546use the read-only (DBD::Multi) handle. It is not necessary to find every single
547call that can be load balanced, since they can safely be sent through the
548read/write handle as well.
549
550=head1 TODO
551
552There really isn't much of a TODO list for this module at this time.  Feel free
553to submit a bug report to rt.cpan.org if you think there is a feature missing.
554
555Although there is some code intended for read/write operations, this should be
556considered not supported and not actively developed at this time.  The actual
557read/write code remains un-documented because in the event that I ever do
558decide to work on supporting read/write operations, the API is not guaranteed
559to stay the same.  The focus of this module is presently limited to read-only
560operations.
561
562=head1 TESTING
563
564DBD::Multi has it's own suite of regression tests.   But, suppose you want to
565verify that you can slip DBD::Multi into whatever application you already have
566written without breaking anything.
567
568Thanks to a feature of DBI, you can regression test DBD::Multi using any
569existing tests that already use DBI without having to update any of your code.
570Simply set the environment variable DBI_AUTOPROXY to 'dbi:Multi:' and then run
571your tests.  DBD::Multi should act as a silent pipe between your application
572and whatever database driver you were previously using.  This will help you
573verify that you aren't currently using some feature of the DBI that breaks
574DBD::Multi (If you are, please do me a favor and submit a bug report so I can
575fix it).
576
577=head1 SEE ALSO
578
579L<CGI::Application::Plugin::DBH> - A plugin for the L<CGI::Application> framework
580which makes it easy to support two database handles, and also supports lazy-loading.
581
582L<DBD::Multiplex>, L<DBIx::HA> - Two modules similar to DBD::Multi, but with
583slightly different objectives.
584
585L<DBI>, L<perl> - You should probably already know about these before using
586this module.
587
588=head1 AUTHOR
589
590Initially written by Casey West and Dan Wright for pair Networks, Inc.
591(www.pair.com)
592
593Maintained by Dan Wright.  <F<DWRIGHT@CPAN.ORG>>.
594
595=cut
596
597