1#  You may distribute under the terms of either the GNU General Public License
2#  or the Artistic License (the same terms as Perl itself)
3#
4#  (C) Paul Evans, 2013-2014 -- leonerd@leonerd.org.uk
5
6package Net::Async::CassandraCQL;
7
8use strict;
9use warnings;
10use 5.010;
11
12our $VERSION = '0.11';
13
14use base qw( IO::Async::Notifier );
15
16use Carp;
17
18use Future::Utils qw( fmap_void try_repeat_until_success );
19use List::Util qw( shuffle );
20use Scalar::Util qw( weaken );
21use Socket qw( inet_ntop getnameinfo AF_INET AF_INET6 NI_NUMERICHOST NIx_NOSERV );
22
23use Protocol::CassandraCQL qw( CONSISTENCY_ONE );
24
25use Net::Async::CassandraCQL::Connection;
26
27use constant DEFAULT_CQL_PORT => 9042;
28
29# Time after which down nodes will be retried
30use constant NODE_RETRY_TIME => 60;
31
32=head1 NAME
33
34C<Net::Async::CassandraCQL> - use Cassandra databases with L<IO::Async> using CQL
35
36=head1 SYNOPSIS
37
38 use IO::Async::Loop;
39 use Net::Async::CassandraCQL;
40 use Protocol::CassandraCQL qw( CONSISTENCY_QUORUM );
41
42 my $loop = IO::Async::Loop->new;
43
44 my $cass = Net::Async::CassandraCQL->new(
45    host => "localhost",
46    keyspace => "my-keyspace",
47    default_consistency => CONSISTENCY_QUORUM,
48 );
49 $loop->add( $cass );
50
51
52 $cass->connect->get;
53
54
55 my @f;
56 foreach my $number ( 1 .. 100 ) {
57    push @f, $cass->query( "INSERT INTO numbers (v) VALUES $number" );
58 }
59 Future->needs_all( @f )->get;
60
61
62 my $get_stmt = $cass->prepare( "SELECT v FROM numbers" )->get;
63
64 my ( undef, $result ) = $get_stmt->execute( [] )->get;
65
66 foreach my $row ( $result->rows_hash ) {
67    say "We have a number " . $row->{v};
68 }
69
70=head1 DESCRIPTION
71
72This module allows use of the C<CQL3> interface of a Cassandra database. It
73fully supports asynchronous operation via L<IO::Async>, allowing both direct
74queries and prepared statements to be managed concurrently, if required.
75Alternatively, as the interface is entirely based on L<Future> objects, it can
76be operated synchronously in a blocking fashion by simply awaiting each
77individual operation by calling the C<get> method.
78
79It is based on L<Protocol::CassandraCQL>, which more completely documents the
80behaviours and limits of its ability to communicate with Cassandra.
81
82=cut
83
84=head1 EVENTS
85
86=head2 on_node_up $nodeid
87
88=head2 on_node_down $nodeid
89
90The node's status has changed. C<$nodeid> is the node's IP address as a text
91string.
92
93=head2 on_node_new $nodeid
94
95=head2 on_node_removed $nodeid
96
97A new node has been added to the cluster, or an existing node has been
98decommissioned and removed.
99
100These four events are obtained from event watches on the actual node
101connections and filtered to remove duplicates. The use of multiple primaries
102should improve the reliability of notifications, though if multiple nodes fail
103at or around the same time this may go unreported, as no node will ever report
104its own failure.
105
106=cut
107
108=head1 PARAMETERS
109
110The following named parameters may be passed to C<new> or C<configure>:
111
112=over 8
113
114=item host => STRING
115
116=item hosts => ARRAY of STRING
117
118The hostnames of Cassandra node to connect to initially. If more than one host
119is provided in an array, they will be attempted sequentially until one
120succeeds during the intial connect phase.
121
122=item service => STRING
123
124Optional. The service name or port number to connect to.
125
126=item username => STRING
127
128=item password => STRING
129
130Optional. Authentication details to use for C<PasswordAuthenticator>.
131
132=item keyspace => STRING
133
134Optional. If set, a C<USE keyspace> query will be issued as part of the
135connect method.
136
137=item default_consistency => INT
138
139Optional. Default consistency level to use if none is provided to C<query> or
140C<execute>.
141
142=item primaries => INT
143
144Optional. The number of primary node connections to maintain. Defaults to 1 if
145not specified.
146
147=item prefer_dc => STRING
148
149Optional. If set, prefer to pick primary nodes from the given data center,
150only falling back on others if there are not enough available.
151
152=item cql_version => INT
153
154Optional. Version of the CQL wire protocol to negotiate during connection.
155Defaults to 1.
156
157=back
158
159=cut
160
161sub _init
162{
163   my $self = shift;
164   my ( $params ) = @_;
165
166   $params->{primaries} //= 1;
167
168   # precache these weasels only once
169   $self->{on_topology_change_cb} = $self->_replace_weakself( sub {
170      shift->_on_topology_change( @_ );
171   });
172   $self->{on_status_change_cb} = $self->_replace_weakself( sub {
173      shift->_on_status_change( @_ );
174   });
175
176   $self->{queries_by_cql} = {}; # {$cql} => [$query, $expire_timer_f]
177   # Can be in two states:
178   #   $query is_weak; timer undef => normal user use
179   #   $query non-weak; timer exists => due to expire soon
180
181   $self->{cql_version} = 1;
182
183   $self->SUPER::_init( $params );
184}
185
186sub configure
187{
188   my $self = shift;
189   my %params = @_;
190
191   if( defined $params{host} ) {
192      $params{hosts} ||= [ delete $params{host} ];
193   }
194
195   foreach (qw( hosts service username password keyspace default_consistency
196                prefer_dc cql_version
197                on_node_up on_node_down on_node_new on_node_removed )) {
198      $self->{$_} = delete $params{$_} if exists $params{$_};
199   }
200
201   if( exists $params{primaries} ) {
202      $self->{primaries} = delete $params{primaries};
203
204      # TODO: connect more / drain old ones
205   }
206
207   $self->SUPER::configure( %params );
208}
209
210=head1 METHODS
211
212=cut
213
214# function
215sub _inet_to_string
216{
217   my ( $addr ) = @_;
218
219   my $addrlen = length $addr;
220   my $family = $addrlen ==  4 ? AF_INET :
221                $addrlen == 16 ? AF_INET6 :
222                die "Expected ADDRLEN 4 or 16";
223   return inet_ntop( $family, $addr );
224}
225
226# function
227sub _nodeid_to_string
228{
229   my ( $node ) = @_;
230
231   return ( getnameinfo( $node, NI_NUMERICHOST, NIx_NOSERV ) )[1];
232}
233
234=head2 $str = $cass->quote( $str )
235
236Quotes a string argument suitable for inclusion in an immediate CQL query
237string.
238
239In general, it is better to use a prepared query and pass the value as an
240execute parameter though.
241
242=cut
243
244sub quote
245{
246   my $self = shift;
247   my ( $str ) = @_;
248
249   # CQL's 'quoting' handles any character except quote marks, which have to
250   # be doubled
251   $str =~ s/'/''/g;
252   return qq('$str');
253}
254
255=head2 $str = $cass->quote_identifier( $str )
256
257Quotes an identifier name suitable for inclusion in a CQL query string.
258
259=cut
260
261sub quote_identifier
262{
263   my $self = shift;
264   my ( $str ) = @_;
265
266   return $str if $str =~ m/^[a-z_][a-z0-9_]+$/;
267
268   # CQL's "quoting" handles any character except quote marks, which have to
269   # be doubled
270   $str =~ s/"/""/g;
271   return qq("$str");
272}
273
274=head2 $cass->connect( %args ) ==> ()
275
276Connects to the Cassandra node and starts up the connection. The returned
277Future will yield nothing on success.
278
279Takes the following named arguments:
280
281=over 8
282
283=item host => STRING
284
285=item hosts => ARRAY of STRING
286
287=item service => STRING
288
289=back
290
291A set of host names are required, either as a named argument or as a
292configured value on the object. If the service name is missing, the default
293CQL port will be used instead.
294
295=cut
296
297# ->_connect_node( $host, $service ) ==> $conn
298# mocked during unit testing
299sub _connect_node
300{
301   my $self = shift;
302   my ( $host, $service ) = @_;
303
304   $service //= $self->{service} // DEFAULT_CQL_PORT;
305
306   my $conn = Net::Async::CassandraCQL::Connection->new(
307      on_closed => sub {
308         my $node = shift;
309         $self->remove_child( $node );
310         $self->_closed_node( $node->nodeid );
311      },
312      map { $_ => $self->{$_} } qw( username password cql_version ),
313   );
314   $self->add_child( $conn );
315
316   $conn->connect(
317      host    => $host,
318      service => $service,
319   )->on_fail( sub {
320      # Some kinds of failure have already removed it
321      $self->remove_child( $conn ) if $conn->parent;
322   });
323}
324
325# invoked during unit testing
326sub _closed_node
327{
328   my $self = shift;
329   my ( $nodeid ) = @_;
330
331   my $now = time();
332
333   $self->{nodes} or return;
334   my $node = $self->{nodes}{$nodeid} or return;
335
336   undef $node->{conn};
337   undef $node->{ready_f};
338   $node->{down_time} = $now;
339
340   if( exists $self->{primary_ids}{$nodeid} ) {
341      $self->debug_printf( "PRIMARY DOWN %s", $nodeid );
342      delete $self->{primary_ids}{$nodeid};
343
344      $self->_pick_new_primary( $now );
345   }
346
347   if( exists $self->{event_ids}{$nodeid} ) {
348      delete $self->{event_ids}{$nodeid};
349
350      $self->_pick_new_eventwatch;
351   }
352}
353
354sub _list_nodeids
355{
356   my $self = shift;
357
358   my $nodes = $self->{nodes};
359
360   my @nodeids = shuffle keys %$nodes;
361   if( defined( my $dc = $self->{prefer_dc} ) ) {
362      # Put preferred ones first
363      @nodeids = ( ( grep { $nodes->{$_}{data_center} eq $dc } @nodeids ),
364                   ( grep { $nodes->{$_}{data_center} ne $dc } @nodeids ) );
365   }
366
367   return @nodeids;
368}
369
370sub connect
371{
372   my $self = shift;
373   my %args = @_;
374
375   my $conn;
376
377   my @hosts = $args{hosts}        ? @{ $args{hosts} } :
378               defined $args{host} ? ( $args{host} ) :
379                                     @{ $self->{hosts} || [] };
380   @hosts or croak "Require initial hostnames to ->connect to";
381
382   ( try_repeat_until_success {
383      $self->_connect_node( $_[0], $args{service} )
384   } foreach => \@hosts )->then( sub {
385      ( $conn ) = @_;
386      $self->_list_nodes( $conn );
387   })->then( sub {
388      my @nodes = @_;
389
390      $self->{nodes} = \my %nodes;
391      foreach my $node ( @nodes ) {
392         my $n = $nodes{$node->{host}} = {
393            data_center => $node->{data_center},
394            rack        => $node->{rack},
395         };
396
397         if( $node->{host} eq $conn->nodeid ) {
398            $n->{conn} = $conn;
399         }
400      }
401
402      # Initial primary on the seed
403      $self->{primary_ids} = {
404         $conn->nodeid => 1,
405      };
406      my $primary0 = $nodes{$conn->nodeid};
407      my $have_primaries = 1;
408
409      my @conn_f;
410
411      $self->debug_printf( "PRIMARY PICKED %s", $conn->nodeid );
412      push @conn_f, $primary0->{ready_f} = $self->_ready_node( $conn->nodeid );
413
414      my @nodeids = $self->_list_nodeids;
415
416      while( @nodeids and $have_primaries < $self->{primaries} ) {
417         my $primary = shift @nodeids;
418         next if $primary eq $conn->nodeid;
419
420         push @conn_f, $self->_connect_new_primary( $primary );
421         $have_primaries++;
422      }
423
424      $self->_pick_new_eventwatch;
425      $self->_pick_new_eventwatch if $have_primaries > 1;
426
427      return $conn_f[0] if @conn_f == 1;
428      return Future->needs_all( @conn_f );
429   });
430}
431
432sub _pick_new_primary
433{
434   my $self = shift;
435   my ( $now ) = @_;
436
437   my $nodes = $self->{nodes};
438
439   my $new_primary;
440
441   # Expire old down statuses and try to find a non-down node that is not yet
442   # primary
443   foreach my $nodeid ( $self->_list_nodeids ) {
444      my $node = $nodes->{$nodeid};
445
446      delete $node->{down_time} if defined $node->{down_time} and $now - $node->{down_time} > NODE_RETRY_TIME;
447
448      next if $self->{primary_ids}{$nodeid};
449
450      $new_primary ||= $nodeid if !$node->{down_time};
451   }
452
453   if( !defined $new_primary ) {
454      die "ARGH! TODO: can't find a new node to be primary\n";
455   }
456
457   $self->_connect_new_primary( $new_primary );
458}
459
460sub _connect_new_primary
461{
462   my $self = shift;
463   my ( $new_primary ) = @_;
464
465   $self->debug_printf( "PRIMARY PICKED %s", $new_primary );
466   $self->{primary_ids}{$new_primary} = 1;
467
468   my $node = $self->{nodes}{$new_primary};
469
470   my $f = $node->{ready_f} = $self->_connect_node( $new_primary )->then( sub {
471      my ( $conn ) = @_;
472      $node->{conn} = $conn;
473
474      $self->_ready_node( $new_primary )
475   })->on_fail( sub {
476      print STDERR "ARGH! NEW PRIMARY FAILED: @_\n";
477   })->on_done( sub {
478      $self->debug_printf( "PRIMARY UP %s", $new_primary );
479   });
480}
481
482sub _ready_node
483{
484   my $self = shift;
485   my ( $nodeid ) = @_;
486
487   my $node = $self->{nodes}{$nodeid} or die "Don't have a node id $nodeid";
488   my $conn = $node->{conn} or die "Expected node to have a {conn} but it doesn't";
489
490   my $keyspace = $self->{keyspace};
491
492   my $keyspace_f =
493      $keyspace ? $conn->query( "USE " . $self->quote_identifier( $keyspace ), CONSISTENCY_ONE )
494                : Future->new->done;
495
496   $keyspace_f->then( sub {
497      my $conn_f = Future->new->done( $conn );
498      return $conn_f unless my $queries_by_cql = $self->{queries_by_cql};
499      return $conn_f unless keys %$queries_by_cql;
500
501      ( fmap_void {
502         my $query = shift->[0] or return Future->new->done;
503         $conn->prepare( $query->cql, $self );
504      } foreach => [ values %$queries_by_cql ] )
505         ->then( sub { $conn_f } );
506   });
507}
508
509sub _pick_new_eventwatch
510{
511   my $self = shift;
512
513   my @primaries = keys %{ $self->{primary_ids} };
514
515   {
516      my $nodeid = $primaries[rand @primaries];
517      redo if $self->{event_ids}{$nodeid};
518
519      $self->{event_ids}{$nodeid} = 1;
520
521      my $node = $self->{nodes}{$nodeid};
522      $node->{ready_f}->on_done( sub {
523         my $conn = shift;
524         $conn->configure(
525            on_topology_change => $self->{on_topology_change_cb},
526            on_status_change   => $self->{on_status_change_cb},
527         );
528         $conn->register( [qw( TOPOLOGY_CHANGE STATUS_CHANGE )] )
529            ->on_fail( sub {
530               delete $self->{event_ids}{$nodeid};
531               $self->_pick_new_eventwatch
532            });
533      });
534   }
535}
536
537sub _on_topology_change
538{
539   my $self = shift;
540   my ( $type, $addr ) = @_;
541   my $nodeid = _nodeid_to_string( $addr );
542
543   my $nodes = $self->{nodes};
544
545   # These updates can happen twice if there's two event connections but
546   # that's OK. Use the state to ensure printing only once
547
548   if( $type eq "NEW_NODE" ) {
549      return if exists $nodes->{$nodeid};
550
551      $nodes->{$nodeid} = {};
552
553      my $f = $self->query_rows( "SELECT peer, data_center, rack FROM system.peers WHERE peer = " . $self->quote( $nodeid ), CONSISTENCY_ONE )
554         ->on_done( sub {
555            my ( $result ) = @_;
556            my $node = $result->row_hash( 0 );
557            $node->{host} = _inet_to_string( delete $node->{peer} );
558
559            %{$nodes->{$nodeid}} = %$node;
560
561            $self->debug_printf( "NEW_NODE {%s}", $nodeid );
562            $self->maybe_invoke_event( on_node_new => $nodeid );
563         });
564
565      # Intentional cycle
566      $f->on_ready( sub { undef $f } );
567   }
568   elsif( $type eq "REMOVED_NODE" ) {
569      return if !exists $nodes->{$nodeid};
570
571      delete $nodes->{$nodeid};
572      $self->debug_printf( "REMOVED_NODE {%s}", $nodeid );
573      $self->maybe_invoke_event( on_node_removed => $nodeid );
574   }
575}
576
577sub _on_status_change
578{
579   my $self = shift;
580   my ( $status, $addr ) = @_;
581   my $nodeid = _nodeid_to_string( $addr );
582
583   my $nodes = $self->{nodes};
584   my $node = $nodes->{$nodeid} or return;
585
586   # These updates can happen twice if there's two event connections but
587   # that's OK. Use the state to ensure printing only once
588
589   if( $status eq "DOWN" ) {
590      return if exists $node->{down_time};
591
592      $self->debug_printf( "STATUS DOWN on {%s}", $nodeid );
593      $self->maybe_invoke_event( on_node_down => $nodeid );
594
595      $node->{down_time} = time();
596   }
597   elsif( $status eq "UP" ) {
598      return if !exists $node->{down_time};
599
600      $self->debug_printf( "STATUS UP on {%s}", $nodeid );
601      $self->maybe_invoke_event( on_node_up => $nodeid );
602
603      delete $node->{down_time};
604
605      return unless defined( my $dc = $self->{prefer_dc} );
606      return unless $node->{data_center} eq $dc;
607      return if $node->{conn};
608
609      # A node in a preferred data center is now up, and we don't already have
610      # a connection to it
611
612      my $old_nodeid;
613      $nodes->{$_}{data_center} ne $dc and $old_nodeid = $_, last for keys %{ $self->{primary_ids} };
614
615      return unless defined $old_nodeid;
616
617      # We do have a connection to a non-preferred node, so lets switch it
618
619      $self->_connect_new_primary( $nodeid );
620
621      # Don't pick it for new nodes
622      $self->debug_printf( "PRIMARY SWITCH %s -> %s", $old_nodeid, $nodeid );
623      delete $self->{primary_ids}{$old_nodeid};
624
625      # Close it when it's empty
626      $nodes->{$old_nodeid}{conn}->close_when_idle;
627   }
628}
629
630=head2 $cass->close_when_idle ==> $cass
631
632Stops accepting new queries and prepares all the existing connections to be
633closed once every outstanding query has been responded to. Returns a future
634that will eventually yield the CassandraCQL object, when all the connections
635are closed.
636
637After calling this method it will be an error to invoke C<query>, C<prepare>,
638C<execute> or the various other methods derived from them.
639
640=cut
641
642sub close_when_idle
643{
644   my $self = shift;
645
646   my $nodes = $self->{nodes};
647
648   # remove 'nodes' to avoid reconnect logic
649   undef $self->{nodes};
650   undef $self->{primary_ids};
651
652   my @f;
653   foreach my $node ( values %$nodes ) {
654      next unless my $conn = $node->{conn};
655      push @f, $conn->close_when_idle;
656   }
657
658   return Future->wait_all( @f )->then( sub {
659      return Future->new->done( $self );
660   });
661}
662
663=head2 $cass->close_now
664
665Immediately closes all node connections and shuts down the object. Any
666outstanding or queued queries will immediately fail. Consider this as a "last
667resort" failure shutdown, as compared to the graceful draining behaviour of
668C<close_when_idle>.
669
670=cut
671
672sub close_now
673{
674   my $self = shift;
675
676   my $nodes = $self->{nodes};
677
678   # remove 'nodes' to avoid reconnect logic
679   undef $self->{nodes};
680   undef $self->{primary_ids};
681
682   foreach my $node ( values %$nodes ) {
683      next unless my $conn = $node->{conn};
684      $conn->close_now;
685   }
686}
687
688sub _get_a_node
689{
690   my $self = shift;
691
692   my $nodes = $self->{nodes} or die "No available nodes";
693
694   # TODO: Other sorting strategies;
695   #   e.g. fewest outstanding queries, least accumulated time recently
696   my @nodeids;
697   {
698      my $next = $self->{next_primary} // 0;
699      @nodeids = keys %{ $self->{primary_ids} } or die "ARGH: $self -> _get_a_node called with no defined primaries";
700
701      # Rotate to the next in sequence
702      @nodeids = ( @nodeids[$next..$#nodeids], @nodeids[0..$next-1] );
703      ( $next += 1 ) %= @nodeids;
704
705      my $next_ready = $next;
706      # Skip non-ready ones
707      while( not $nodes->{$nodeids[0]}->{ready_f}->is_ready ) {
708         push @nodeids, shift @nodeids;
709         ( $next_ready += 1 ) %= @nodeids;
710         last if $next_ready == $next; # none were ready - just use the next
711      }
712      $self->{next_primary} = $next_ready;
713   }
714
715   if( my $node = $nodes->{ $nodeids[0] } ) {
716      return $node->{ready_f};
717   }
718
719   die "ARGH: don't have a primary node";
720}
721
722=head2 $cass->query( $cql, $consistency, %other_args ) ==> ( $type, $result )
723
724Performs a CQL query. On success, the values returned from the Future will
725depend on the type of query.
726
727For C<USE> queries, the type is C<keyspace> and C<$result> is a string giving
728the name of the new keyspace.
729
730For C<CREATE>, C<ALTER> and C<DROP> queries, the type is C<schema_change> and
731C<$result> is a 3-element ARRAY reference containing the type of change, the
732keyspace and the table name.
733
734For C<SELECT> queries, the type is C<rows> and C<$result> is an instance of
735L<Protocol::CassandraCQL::Result> containing the returned row data.
736
737For other queries, such as C<INSERT>, C<UPDATE> and C<DELETE>, the future
738returns nothing.
739
740C<%other_args> may be any of the following, when using C<cql_version> 2 or
741above:
742
743=over 8
744
745=item skip_metadata => BOOL
746
747Requests the server does not include result metadata in the response. It will
748be up to the caller to provide this, via C<set_metadata> on the returned
749Result object, before it can be used.
750
751=item page_size => INT
752
753Requests that the server returns at most the given number of rows. If any
754further remain, the result object will include the C<paging_state> field. This
755can be passed in another C<query> call to obtain the next set of data.
756
757=item paging_state => INT
758
759Requests that the server continues a paged request from this position, given
760in a previous response.
761
762=item serial_consistency => INT
763
764Sets the consistency level for serial operations in the query. Must be one of
765C<CONSISTENCY_SERIAL> or C<CONSISTENCY_LOCAL_SERIAL>.
766
767=back
768
769=cut
770
771sub _debug_wrap_result
772{
773   my ( $op, $self, $f ) = @_;
774
775   $f->on_ready( sub {
776      my $f = shift;
777      if( $f->failure ) {
778         $self->debug_printf( "$op => FAIL %s", scalar $f->failure );
779      }
780      elsif( my ( $type, $result ) = $f->get ) {
781         if( $type eq "rows" ) {
782            $result = sprintf "%d x %d columns", $result->rows, $result->columns;
783         }
784         elsif( $type eq "schema_change" ) {
785            $result = sprintf "%s %s", $result->[0], join ".", @{$result}[1..$#$result];
786         }
787         $self->debug_printf( "$op => %s %s", uc $type, $result );
788      }
789      else {
790         $self->debug_printf( "$op => VOID" );
791      }
792   }) if $IO::Async::Notifier::DEBUG;
793
794   return $f;
795}
796
797sub query
798{
799   my $self = shift;
800   my ( $cql, $consistency, %other_args ) = @_;
801
802   $consistency //= $self->{default_consistency};
803   defined $consistency or croak "'query' needs a consistency level";
804
805   _debug_wrap_result QUERY => $self, $self->_get_a_node->then( sub {
806      my $node = shift;
807      $self->debug_printf( "QUERY on {%s}: %s", $node->nodeid, $cql );
808      $node->query( $cql, $consistency, %other_args );
809   });
810}
811
812=head2 $cass->query_rows( $cql, $consistency, %other_args ) ==> $result
813
814A shortcut wrapper for C<query> which expects a C<rows> result and returns it
815directly. Any other result is treated as an error. The returned Future returns
816a C<Protocol::CassandraCQL::Result> directly
817
818=cut
819
820sub query_rows
821{
822   my $self = shift;
823
824   $self->query( @_ )->then( sub {
825      my ( $type, $result ) = @_;
826      $type eq "rows" or Future->new->fail( "Expected 'rows' result" );
827      Future->new->done( $result );
828   });
829}
830
831=head2 $cass->prepare( $cql ) ==> $query
832
833Prepares a CQL query for later execution. On success, the returned Future
834yields an instance of a prepared query object (see below).
835
836Query objects stored internally cached by the CQL string; subsequent calls to
837C<prepare> with the same exact CQL string will yield the same object
838immediately, saving a roundtrip.
839
840=cut
841
842sub prepare
843{
844   my $self = shift;
845   my ( $cql ) = @_;
846
847   my $nodes = $self->{nodes} or die "No available nodes";
848
849   my $queries_by_cql = $self->{queries_by_cql};
850
851   if( my $q = $queries_by_cql->{$cql} ) {
852      my $query = $q->[0];
853      if( $q->[1] ) {
854         $q->[1]->cancel;
855         undef $q->[1];
856         weaken( $q->[0] );
857      }
858      return Future->new->done( $query );
859   }
860
861   $self->debug_printf( "PREPARE %s", $cql );
862
863   my @prepare_f = map {
864      my $node = $nodes->{$_}{conn};
865      $node->prepare( $cql, $self )
866   } keys %{ $self->{primary_ids} };
867
868   Future->needs_all( @prepare_f )->then( sub {
869      my ( $query ) = @_;
870      # Ignore the other objects; they'll all have the same ID anyway
871
872      $self->debug_printf( "PREPARE => [%s]", unpack "H*", $query->id );
873
874      my $q = $queries_by_cql->{$cql} = [ $query, undef ];
875      weaken( $q->[0] );
876
877      Future->new->done( $query );
878   });
879}
880
881sub _expire_query
882{
883   my $self = shift;
884   my ( $cql ) = @_;
885
886   my $queries_by_cql = $self->{queries_by_cql};
887   my $q = $queries_by_cql->{$cql} or return;
888
889   my $query = $q->[0]; undef $q->[0]; $q->[0] = $query; # unweaken
890
891   $q->[1] = $self->loop->delay_future( after => 60 )
892      ->on_done( sub {
893         # Remove the {cassandra} element from the query so it doesn't
894         # re-register itself for expiry when it is DESTROYed again
895         undef $q->[0]{cassandra};
896         delete $queries_by_cql->{$cql};
897      });
898}
899
900=head2 $cass->execute( $query, $data, $consistency, %other_args ) ==> ( $type, $result )
901
902Executes a previously-prepared statement, given the binding data. On success,
903the returned Future will yield results of the same form as the C<query>
904method. C<$data> should contain a list of encoded byte-string values.
905
906Normally this method is not directly required - instead, use the C<execute>
907method on the query object itself, as this will encode the parameters
908correctly.
909
910C<%other_args> may be as for the C<query> method.
911
912=cut
913
914sub execute
915{
916   my $self = shift;
917   my ( $query, $data, $consistency, %other_args ) = @_;
918
919   $consistency //= $self->{default_consistency};
920   defined $consistency or croak "'execute' needs a consistency level";
921
922   _debug_wrap_result EXECUTE => $self, $self->_get_a_node->then( sub {
923      my $node = shift;
924      $self->debug_printf( "EXECUTE on {%s}: %s [%s]", $node->nodeid, $query->cql, unpack "H*", $query->id );
925      $node->execute( $query->id, $data, $consistency, %other_args );
926   });
927}
928
929=head1 CONVENIENT WRAPPERS
930
931The following wrapper methods all wrap the basic C<query> operation.
932
933=cut
934
935=head2 $cass->schema_keyspaces ==> $result
936
937A shortcut to a C<SELECT> query on C<system.schema_keyspaces>, which returns a
938result object listing all the keyspaces.
939
940Exact details of the returned columns will depend on the Cassandra version,
941but the result should at least be keyed by the first column, called
942C<keyspace_name>.
943
944 my $keyspaces = $result->rowmap_hash( "keyspace_name" )
945
946=cut
947
948sub schema_keyspaces
949{
950   my $self = shift;
951
952   $self->query_rows(
953      "SELECT * FROM system.schema_keyspaces",
954      CONSISTENCY_ONE
955   );
956}
957
958=head2 $cass->schema_columnfamilies( $keyspace ) ==> $result
959
960A shortcut to a C<SELECT> query on C<system.schema_columnfamilies>, which
961returns a result object listing all the columnfamilies of the given keyspace.
962
963Exact details of the returned columns will depend on the Cassandra version,
964but the result should at least be keyed by the first column, called
965C<columnfamily_name>.
966
967 my $columnfamilies = $result->rowmap_hash( "columnfamily_name" )
968
969=cut
970
971sub schema_columnfamilies
972{
973   my $self = shift;
974   my ( $keyspace ) = @_;
975
976   $self->query_rows(
977      "SELECT * FROM system.schema_columnfamilies WHERE keyspace_name = " . $self->quote( $keyspace ),
978      CONSISTENCY_ONE
979   );
980}
981
982=head2 $cass->schema_columns( $keyspace, $columnfamily ) ==> $result
983
984A shortcut to a C<SELECT> query on C<system.schema_columns>, which returns a
985result object listing all the columns of the given columnfamily.
986
987Exact details of the returned columns will depend on the Cassandra version,
988but the result should at least be keyed by the first column, called
989C<column_name>.
990
991 my $columns = $result->rowmap_hash( "column_name" )
992
993=cut
994
995sub schema_columns
996{
997   my $self = shift;
998   my ( $keyspace, $columnfamily ) = @_;
999
1000   $self->query_rows(
1001      "SELECT * FROM system.schema_columns WHERE keyspace_name = " . $self->quote( $keyspace ) . " AND columnfamily_name = " . $self->quote( $columnfamily ),
1002      CONSISTENCY_ONE,
1003   );
1004}
1005
1006sub _list_nodes
1007{
1008   my $self = shift;
1009   my ( $conn ) = @_;
1010
1011   # The system.peers table doesn't include the node we actually connect to.
1012   # So we'll have to look up its own information from system.local and add
1013   # the socket address manually.
1014   Future->needs_all(
1015      $conn->query( "SELECT data_center, rack FROM system.local", CONSISTENCY_ONE )
1016         ->then( sub {
1017            my ( $type, $result ) = @_;
1018            $type eq "rows" or Future->new->fail( "Expected 'rows' result" );
1019            my $local = $result->row_hash( 0 );
1020            $local->{host} = $conn->nodeid;
1021            Future->new->done( $local );
1022         }),
1023      $conn->query( "SELECT peer, data_center, rack FROM system.peers", CONSISTENCY_ONE )
1024         ->then( sub {
1025            my ( $type, $result ) = @_;
1026            $type eq "rows" or Future->new->fail( "Expected 'rows' result" );
1027            my @nodes = $result->rows_hash;
1028            foreach my $node ( @nodes ) {
1029               $node->{host} = _inet_to_string( delete $node->{peer} );
1030            }
1031            Future->new->done( @nodes );
1032         }),
1033   )
1034}
1035
1036=head1 TODO
1037
1038=over 8
1039
1040=item *
1041
1042Allow other load-balancing strategies than roundrobin.
1043
1044=item *
1045
1046Adjust connected primary nodes when changing C<primaries> parameter.
1047
1048=item *
1049
1050Allow backup nodes, for faster connection failover.
1051
1052=item *
1053
1054Support LZ4 compression when using CQL version 2.
1055
1056This is blocked on RT #92825
1057
1058=back
1059
1060=cut
1061
1062=head1 SPONSORS
1063
1064This code was paid for by
1065
1066=over 2
1067
1068=item *
1069
1070Perceptyx L<http://www.perceptyx.com/>
1071
1072=item *
1073
1074Shadowcat Systems L<http://www.shadow.cat>
1075
1076=back
1077
1078=head1 AUTHOR
1079
1080Paul Evans <leonerd@leonerd.org.uk>
1081
1082=cut
1083
10840x55AA;
1085