1# You may distribute under the terms of either the GNU General Public License 2# or the Artistic License (the same terms as Perl itself) 3# 4# (C) Paul Evans, 2013-2014 -- leonerd@leonerd.org.uk 5 6package Net::Async::CassandraCQL; 7 8use strict; 9use warnings; 10use 5.010; 11 12our $VERSION = '0.11'; 13 14use base qw( IO::Async::Notifier ); 15 16use Carp; 17 18use Future::Utils qw( fmap_void try_repeat_until_success ); 19use List::Util qw( shuffle ); 20use Scalar::Util qw( weaken ); 21use Socket qw( inet_ntop getnameinfo AF_INET AF_INET6 NI_NUMERICHOST NIx_NOSERV ); 22 23use Protocol::CassandraCQL qw( CONSISTENCY_ONE ); 24 25use Net::Async::CassandraCQL::Connection; 26 27use constant DEFAULT_CQL_PORT => 9042; 28 29# Time after which down nodes will be retried 30use constant NODE_RETRY_TIME => 60; 31 32=head1 NAME 33 34C<Net::Async::CassandraCQL> - use Cassandra databases with L<IO::Async> using CQL 35 36=head1 SYNOPSIS 37 38 use IO::Async::Loop; 39 use Net::Async::CassandraCQL; 40 use Protocol::CassandraCQL qw( CONSISTENCY_QUORUM ); 41 42 my $loop = IO::Async::Loop->new; 43 44 my $cass = Net::Async::CassandraCQL->new( 45 host => "localhost", 46 keyspace => "my-keyspace", 47 default_consistency => CONSISTENCY_QUORUM, 48 ); 49 $loop->add( $cass ); 50 51 52 $cass->connect->get; 53 54 55 my @f; 56 foreach my $number ( 1 .. 100 ) { 57 push @f, $cass->query( "INSERT INTO numbers (v) VALUES $number" ); 58 } 59 Future->needs_all( @f )->get; 60 61 62 my $get_stmt = $cass->prepare( "SELECT v FROM numbers" )->get; 63 64 my ( undef, $result ) = $get_stmt->execute( [] )->get; 65 66 foreach my $row ( $result->rows_hash ) { 67 say "We have a number " . $row->{v}; 68 } 69 70=head1 DESCRIPTION 71 72This module allows use of the C<CQL3> interface of a Cassandra database. It 73fully supports asynchronous operation via L<IO::Async>, allowing both direct 74queries and prepared statements to be managed concurrently, if required. 75Alternatively, as the interface is entirely based on L<Future> objects, it can 76be operated synchronously in a blocking fashion by simply awaiting each 77individual operation by calling the C<get> method. 78 79It is based on L<Protocol::CassandraCQL>, which more completely documents the 80behaviours and limits of its ability to communicate with Cassandra. 81 82=cut 83 84=head1 EVENTS 85 86=head2 on_node_up $nodeid 87 88=head2 on_node_down $nodeid 89 90The node's status has changed. C<$nodeid> is the node's IP address as a text 91string. 92 93=head2 on_node_new $nodeid 94 95=head2 on_node_removed $nodeid 96 97A new node has been added to the cluster, or an existing node has been 98decommissioned and removed. 99 100These four events are obtained from event watches on the actual node 101connections and filtered to remove duplicates. The use of multiple primaries 102should improve the reliability of notifications, though if multiple nodes fail 103at or around the same time this may go unreported, as no node will ever report 104its own failure. 105 106=cut 107 108=head1 PARAMETERS 109 110The following named parameters may be passed to C<new> or C<configure>: 111 112=over 8 113 114=item host => STRING 115 116=item hosts => ARRAY of STRING 117 118The hostnames of Cassandra node to connect to initially. If more than one host 119is provided in an array, they will be attempted sequentially until one 120succeeds during the intial connect phase. 121 122=item service => STRING 123 124Optional. The service name or port number to connect to. 125 126=item username => STRING 127 128=item password => STRING 129 130Optional. Authentication details to use for C<PasswordAuthenticator>. 131 132=item keyspace => STRING 133 134Optional. If set, a C<USE keyspace> query will be issued as part of the 135connect method. 136 137=item default_consistency => INT 138 139Optional. Default consistency level to use if none is provided to C<query> or 140C<execute>. 141 142=item primaries => INT 143 144Optional. The number of primary node connections to maintain. Defaults to 1 if 145not specified. 146 147=item prefer_dc => STRING 148 149Optional. If set, prefer to pick primary nodes from the given data center, 150only falling back on others if there are not enough available. 151 152=item cql_version => INT 153 154Optional. Version of the CQL wire protocol to negotiate during connection. 155Defaults to 1. 156 157=back 158 159=cut 160 161sub _init 162{ 163 my $self = shift; 164 my ( $params ) = @_; 165 166 $params->{primaries} //= 1; 167 168 # precache these weasels only once 169 $self->{on_topology_change_cb} = $self->_replace_weakself( sub { 170 shift->_on_topology_change( @_ ); 171 }); 172 $self->{on_status_change_cb} = $self->_replace_weakself( sub { 173 shift->_on_status_change( @_ ); 174 }); 175 176 $self->{queries_by_cql} = {}; # {$cql} => [$query, $expire_timer_f] 177 # Can be in two states: 178 # $query is_weak; timer undef => normal user use 179 # $query non-weak; timer exists => due to expire soon 180 181 $self->{cql_version} = 1; 182 183 $self->SUPER::_init( $params ); 184} 185 186sub configure 187{ 188 my $self = shift; 189 my %params = @_; 190 191 if( defined $params{host} ) { 192 $params{hosts} ||= [ delete $params{host} ]; 193 } 194 195 foreach (qw( hosts service username password keyspace default_consistency 196 prefer_dc cql_version 197 on_node_up on_node_down on_node_new on_node_removed )) { 198 $self->{$_} = delete $params{$_} if exists $params{$_}; 199 } 200 201 if( exists $params{primaries} ) { 202 $self->{primaries} = delete $params{primaries}; 203 204 # TODO: connect more / drain old ones 205 } 206 207 $self->SUPER::configure( %params ); 208} 209 210=head1 METHODS 211 212=cut 213 214# function 215sub _inet_to_string 216{ 217 my ( $addr ) = @_; 218 219 my $addrlen = length $addr; 220 my $family = $addrlen == 4 ? AF_INET : 221 $addrlen == 16 ? AF_INET6 : 222 die "Expected ADDRLEN 4 or 16"; 223 return inet_ntop( $family, $addr ); 224} 225 226# function 227sub _nodeid_to_string 228{ 229 my ( $node ) = @_; 230 231 return ( getnameinfo( $node, NI_NUMERICHOST, NIx_NOSERV ) )[1]; 232} 233 234=head2 $str = $cass->quote( $str ) 235 236Quotes a string argument suitable for inclusion in an immediate CQL query 237string. 238 239In general, it is better to use a prepared query and pass the value as an 240execute parameter though. 241 242=cut 243 244sub quote 245{ 246 my $self = shift; 247 my ( $str ) = @_; 248 249 # CQL's 'quoting' handles any character except quote marks, which have to 250 # be doubled 251 $str =~ s/'/''/g; 252 return qq('$str'); 253} 254 255=head2 $str = $cass->quote_identifier( $str ) 256 257Quotes an identifier name suitable for inclusion in a CQL query string. 258 259=cut 260 261sub quote_identifier 262{ 263 my $self = shift; 264 my ( $str ) = @_; 265 266 return $str if $str =~ m/^[a-z_][a-z0-9_]+$/; 267 268 # CQL's "quoting" handles any character except quote marks, which have to 269 # be doubled 270 $str =~ s/"/""/g; 271 return qq("$str"); 272} 273 274=head2 $cass->connect( %args ) ==> () 275 276Connects to the Cassandra node and starts up the connection. The returned 277Future will yield nothing on success. 278 279Takes the following named arguments: 280 281=over 8 282 283=item host => STRING 284 285=item hosts => ARRAY of STRING 286 287=item service => STRING 288 289=back 290 291A set of host names are required, either as a named argument or as a 292configured value on the object. If the service name is missing, the default 293CQL port will be used instead. 294 295=cut 296 297# ->_connect_node( $host, $service ) ==> $conn 298# mocked during unit testing 299sub _connect_node 300{ 301 my $self = shift; 302 my ( $host, $service ) = @_; 303 304 $service //= $self->{service} // DEFAULT_CQL_PORT; 305 306 my $conn = Net::Async::CassandraCQL::Connection->new( 307 on_closed => sub { 308 my $node = shift; 309 $self->remove_child( $node ); 310 $self->_closed_node( $node->nodeid ); 311 }, 312 map { $_ => $self->{$_} } qw( username password cql_version ), 313 ); 314 $self->add_child( $conn ); 315 316 $conn->connect( 317 host => $host, 318 service => $service, 319 )->on_fail( sub { 320 # Some kinds of failure have already removed it 321 $self->remove_child( $conn ) if $conn->parent; 322 }); 323} 324 325# invoked during unit testing 326sub _closed_node 327{ 328 my $self = shift; 329 my ( $nodeid ) = @_; 330 331 my $now = time(); 332 333 $self->{nodes} or return; 334 my $node = $self->{nodes}{$nodeid} or return; 335 336 undef $node->{conn}; 337 undef $node->{ready_f}; 338 $node->{down_time} = $now; 339 340 if( exists $self->{primary_ids}{$nodeid} ) { 341 $self->debug_printf( "PRIMARY DOWN %s", $nodeid ); 342 delete $self->{primary_ids}{$nodeid}; 343 344 $self->_pick_new_primary( $now ); 345 } 346 347 if( exists $self->{event_ids}{$nodeid} ) { 348 delete $self->{event_ids}{$nodeid}; 349 350 $self->_pick_new_eventwatch; 351 } 352} 353 354sub _list_nodeids 355{ 356 my $self = shift; 357 358 my $nodes = $self->{nodes}; 359 360 my @nodeids = shuffle keys %$nodes; 361 if( defined( my $dc = $self->{prefer_dc} ) ) { 362 # Put preferred ones first 363 @nodeids = ( ( grep { $nodes->{$_}{data_center} eq $dc } @nodeids ), 364 ( grep { $nodes->{$_}{data_center} ne $dc } @nodeids ) ); 365 } 366 367 return @nodeids; 368} 369 370sub connect 371{ 372 my $self = shift; 373 my %args = @_; 374 375 my $conn; 376 377 my @hosts = $args{hosts} ? @{ $args{hosts} } : 378 defined $args{host} ? ( $args{host} ) : 379 @{ $self->{hosts} || [] }; 380 @hosts or croak "Require initial hostnames to ->connect to"; 381 382 ( try_repeat_until_success { 383 $self->_connect_node( $_[0], $args{service} ) 384 } foreach => \@hosts )->then( sub { 385 ( $conn ) = @_; 386 $self->_list_nodes( $conn ); 387 })->then( sub { 388 my @nodes = @_; 389 390 $self->{nodes} = \my %nodes; 391 foreach my $node ( @nodes ) { 392 my $n = $nodes{$node->{host}} = { 393 data_center => $node->{data_center}, 394 rack => $node->{rack}, 395 }; 396 397 if( $node->{host} eq $conn->nodeid ) { 398 $n->{conn} = $conn; 399 } 400 } 401 402 # Initial primary on the seed 403 $self->{primary_ids} = { 404 $conn->nodeid => 1, 405 }; 406 my $primary0 = $nodes{$conn->nodeid}; 407 my $have_primaries = 1; 408 409 my @conn_f; 410 411 $self->debug_printf( "PRIMARY PICKED %s", $conn->nodeid ); 412 push @conn_f, $primary0->{ready_f} = $self->_ready_node( $conn->nodeid ); 413 414 my @nodeids = $self->_list_nodeids; 415 416 while( @nodeids and $have_primaries < $self->{primaries} ) { 417 my $primary = shift @nodeids; 418 next if $primary eq $conn->nodeid; 419 420 push @conn_f, $self->_connect_new_primary( $primary ); 421 $have_primaries++; 422 } 423 424 $self->_pick_new_eventwatch; 425 $self->_pick_new_eventwatch if $have_primaries > 1; 426 427 return $conn_f[0] if @conn_f == 1; 428 return Future->needs_all( @conn_f ); 429 }); 430} 431 432sub _pick_new_primary 433{ 434 my $self = shift; 435 my ( $now ) = @_; 436 437 my $nodes = $self->{nodes}; 438 439 my $new_primary; 440 441 # Expire old down statuses and try to find a non-down node that is not yet 442 # primary 443 foreach my $nodeid ( $self->_list_nodeids ) { 444 my $node = $nodes->{$nodeid}; 445 446 delete $node->{down_time} if defined $node->{down_time} and $now - $node->{down_time} > NODE_RETRY_TIME; 447 448 next if $self->{primary_ids}{$nodeid}; 449 450 $new_primary ||= $nodeid if !$node->{down_time}; 451 } 452 453 if( !defined $new_primary ) { 454 die "ARGH! TODO: can't find a new node to be primary\n"; 455 } 456 457 $self->_connect_new_primary( $new_primary ); 458} 459 460sub _connect_new_primary 461{ 462 my $self = shift; 463 my ( $new_primary ) = @_; 464 465 $self->debug_printf( "PRIMARY PICKED %s", $new_primary ); 466 $self->{primary_ids}{$new_primary} = 1; 467 468 my $node = $self->{nodes}{$new_primary}; 469 470 my $f = $node->{ready_f} = $self->_connect_node( $new_primary )->then( sub { 471 my ( $conn ) = @_; 472 $node->{conn} = $conn; 473 474 $self->_ready_node( $new_primary ) 475 })->on_fail( sub { 476 print STDERR "ARGH! NEW PRIMARY FAILED: @_\n"; 477 })->on_done( sub { 478 $self->debug_printf( "PRIMARY UP %s", $new_primary ); 479 }); 480} 481 482sub _ready_node 483{ 484 my $self = shift; 485 my ( $nodeid ) = @_; 486 487 my $node = $self->{nodes}{$nodeid} or die "Don't have a node id $nodeid"; 488 my $conn = $node->{conn} or die "Expected node to have a {conn} but it doesn't"; 489 490 my $keyspace = $self->{keyspace}; 491 492 my $keyspace_f = 493 $keyspace ? $conn->query( "USE " . $self->quote_identifier( $keyspace ), CONSISTENCY_ONE ) 494 : Future->new->done; 495 496 $keyspace_f->then( sub { 497 my $conn_f = Future->new->done( $conn ); 498 return $conn_f unless my $queries_by_cql = $self->{queries_by_cql}; 499 return $conn_f unless keys %$queries_by_cql; 500 501 ( fmap_void { 502 my $query = shift->[0] or return Future->new->done; 503 $conn->prepare( $query->cql, $self ); 504 } foreach => [ values %$queries_by_cql ] ) 505 ->then( sub { $conn_f } ); 506 }); 507} 508 509sub _pick_new_eventwatch 510{ 511 my $self = shift; 512 513 my @primaries = keys %{ $self->{primary_ids} }; 514 515 { 516 my $nodeid = $primaries[rand @primaries]; 517 redo if $self->{event_ids}{$nodeid}; 518 519 $self->{event_ids}{$nodeid} = 1; 520 521 my $node = $self->{nodes}{$nodeid}; 522 $node->{ready_f}->on_done( sub { 523 my $conn = shift; 524 $conn->configure( 525 on_topology_change => $self->{on_topology_change_cb}, 526 on_status_change => $self->{on_status_change_cb}, 527 ); 528 $conn->register( [qw( TOPOLOGY_CHANGE STATUS_CHANGE )] ) 529 ->on_fail( sub { 530 delete $self->{event_ids}{$nodeid}; 531 $self->_pick_new_eventwatch 532 }); 533 }); 534 } 535} 536 537sub _on_topology_change 538{ 539 my $self = shift; 540 my ( $type, $addr ) = @_; 541 my $nodeid = _nodeid_to_string( $addr ); 542 543 my $nodes = $self->{nodes}; 544 545 # These updates can happen twice if there's two event connections but 546 # that's OK. Use the state to ensure printing only once 547 548 if( $type eq "NEW_NODE" ) { 549 return if exists $nodes->{$nodeid}; 550 551 $nodes->{$nodeid} = {}; 552 553 my $f = $self->query_rows( "SELECT peer, data_center, rack FROM system.peers WHERE peer = " . $self->quote( $nodeid ), CONSISTENCY_ONE ) 554 ->on_done( sub { 555 my ( $result ) = @_; 556 my $node = $result->row_hash( 0 ); 557 $node->{host} = _inet_to_string( delete $node->{peer} ); 558 559 %{$nodes->{$nodeid}} = %$node; 560 561 $self->debug_printf( "NEW_NODE {%s}", $nodeid ); 562 $self->maybe_invoke_event( on_node_new => $nodeid ); 563 }); 564 565 # Intentional cycle 566 $f->on_ready( sub { undef $f } ); 567 } 568 elsif( $type eq "REMOVED_NODE" ) { 569 return if !exists $nodes->{$nodeid}; 570 571 delete $nodes->{$nodeid}; 572 $self->debug_printf( "REMOVED_NODE {%s}", $nodeid ); 573 $self->maybe_invoke_event( on_node_removed => $nodeid ); 574 } 575} 576 577sub _on_status_change 578{ 579 my $self = shift; 580 my ( $status, $addr ) = @_; 581 my $nodeid = _nodeid_to_string( $addr ); 582 583 my $nodes = $self->{nodes}; 584 my $node = $nodes->{$nodeid} or return; 585 586 # These updates can happen twice if there's two event connections but 587 # that's OK. Use the state to ensure printing only once 588 589 if( $status eq "DOWN" ) { 590 return if exists $node->{down_time}; 591 592 $self->debug_printf( "STATUS DOWN on {%s}", $nodeid ); 593 $self->maybe_invoke_event( on_node_down => $nodeid ); 594 595 $node->{down_time} = time(); 596 } 597 elsif( $status eq "UP" ) { 598 return if !exists $node->{down_time}; 599 600 $self->debug_printf( "STATUS UP on {%s}", $nodeid ); 601 $self->maybe_invoke_event( on_node_up => $nodeid ); 602 603 delete $node->{down_time}; 604 605 return unless defined( my $dc = $self->{prefer_dc} ); 606 return unless $node->{data_center} eq $dc; 607 return if $node->{conn}; 608 609 # A node in a preferred data center is now up, and we don't already have 610 # a connection to it 611 612 my $old_nodeid; 613 $nodes->{$_}{data_center} ne $dc and $old_nodeid = $_, last for keys %{ $self->{primary_ids} }; 614 615 return unless defined $old_nodeid; 616 617 # We do have a connection to a non-preferred node, so lets switch it 618 619 $self->_connect_new_primary( $nodeid ); 620 621 # Don't pick it for new nodes 622 $self->debug_printf( "PRIMARY SWITCH %s -> %s", $old_nodeid, $nodeid ); 623 delete $self->{primary_ids}{$old_nodeid}; 624 625 # Close it when it's empty 626 $nodes->{$old_nodeid}{conn}->close_when_idle; 627 } 628} 629 630=head2 $cass->close_when_idle ==> $cass 631 632Stops accepting new queries and prepares all the existing connections to be 633closed once every outstanding query has been responded to. Returns a future 634that will eventually yield the CassandraCQL object, when all the connections 635are closed. 636 637After calling this method it will be an error to invoke C<query>, C<prepare>, 638C<execute> or the various other methods derived from them. 639 640=cut 641 642sub close_when_idle 643{ 644 my $self = shift; 645 646 my $nodes = $self->{nodes}; 647 648 # remove 'nodes' to avoid reconnect logic 649 undef $self->{nodes}; 650 undef $self->{primary_ids}; 651 652 my @f; 653 foreach my $node ( values %$nodes ) { 654 next unless my $conn = $node->{conn}; 655 push @f, $conn->close_when_idle; 656 } 657 658 return Future->wait_all( @f )->then( sub { 659 return Future->new->done( $self ); 660 }); 661} 662 663=head2 $cass->close_now 664 665Immediately closes all node connections and shuts down the object. Any 666outstanding or queued queries will immediately fail. Consider this as a "last 667resort" failure shutdown, as compared to the graceful draining behaviour of 668C<close_when_idle>. 669 670=cut 671 672sub close_now 673{ 674 my $self = shift; 675 676 my $nodes = $self->{nodes}; 677 678 # remove 'nodes' to avoid reconnect logic 679 undef $self->{nodes}; 680 undef $self->{primary_ids}; 681 682 foreach my $node ( values %$nodes ) { 683 next unless my $conn = $node->{conn}; 684 $conn->close_now; 685 } 686} 687 688sub _get_a_node 689{ 690 my $self = shift; 691 692 my $nodes = $self->{nodes} or die "No available nodes"; 693 694 # TODO: Other sorting strategies; 695 # e.g. fewest outstanding queries, least accumulated time recently 696 my @nodeids; 697 { 698 my $next = $self->{next_primary} // 0; 699 @nodeids = keys %{ $self->{primary_ids} } or die "ARGH: $self -> _get_a_node called with no defined primaries"; 700 701 # Rotate to the next in sequence 702 @nodeids = ( @nodeids[$next..$#nodeids], @nodeids[0..$next-1] ); 703 ( $next += 1 ) %= @nodeids; 704 705 my $next_ready = $next; 706 # Skip non-ready ones 707 while( not $nodes->{$nodeids[0]}->{ready_f}->is_ready ) { 708 push @nodeids, shift @nodeids; 709 ( $next_ready += 1 ) %= @nodeids; 710 last if $next_ready == $next; # none were ready - just use the next 711 } 712 $self->{next_primary} = $next_ready; 713 } 714 715 if( my $node = $nodes->{ $nodeids[0] } ) { 716 return $node->{ready_f}; 717 } 718 719 die "ARGH: don't have a primary node"; 720} 721 722=head2 $cass->query( $cql, $consistency, %other_args ) ==> ( $type, $result ) 723 724Performs a CQL query. On success, the values returned from the Future will 725depend on the type of query. 726 727For C<USE> queries, the type is C<keyspace> and C<$result> is a string giving 728the name of the new keyspace. 729 730For C<CREATE>, C<ALTER> and C<DROP> queries, the type is C<schema_change> and 731C<$result> is a 3-element ARRAY reference containing the type of change, the 732keyspace and the table name. 733 734For C<SELECT> queries, the type is C<rows> and C<$result> is an instance of 735L<Protocol::CassandraCQL::Result> containing the returned row data. 736 737For other queries, such as C<INSERT>, C<UPDATE> and C<DELETE>, the future 738returns nothing. 739 740C<%other_args> may be any of the following, when using C<cql_version> 2 or 741above: 742 743=over 8 744 745=item skip_metadata => BOOL 746 747Requests the server does not include result metadata in the response. It will 748be up to the caller to provide this, via C<set_metadata> on the returned 749Result object, before it can be used. 750 751=item page_size => INT 752 753Requests that the server returns at most the given number of rows. If any 754further remain, the result object will include the C<paging_state> field. This 755can be passed in another C<query> call to obtain the next set of data. 756 757=item paging_state => INT 758 759Requests that the server continues a paged request from this position, given 760in a previous response. 761 762=item serial_consistency => INT 763 764Sets the consistency level for serial operations in the query. Must be one of 765C<CONSISTENCY_SERIAL> or C<CONSISTENCY_LOCAL_SERIAL>. 766 767=back 768 769=cut 770 771sub _debug_wrap_result 772{ 773 my ( $op, $self, $f ) = @_; 774 775 $f->on_ready( sub { 776 my $f = shift; 777 if( $f->failure ) { 778 $self->debug_printf( "$op => FAIL %s", scalar $f->failure ); 779 } 780 elsif( my ( $type, $result ) = $f->get ) { 781 if( $type eq "rows" ) { 782 $result = sprintf "%d x %d columns", $result->rows, $result->columns; 783 } 784 elsif( $type eq "schema_change" ) { 785 $result = sprintf "%s %s", $result->[0], join ".", @{$result}[1..$#$result]; 786 } 787 $self->debug_printf( "$op => %s %s", uc $type, $result ); 788 } 789 else { 790 $self->debug_printf( "$op => VOID" ); 791 } 792 }) if $IO::Async::Notifier::DEBUG; 793 794 return $f; 795} 796 797sub query 798{ 799 my $self = shift; 800 my ( $cql, $consistency, %other_args ) = @_; 801 802 $consistency //= $self->{default_consistency}; 803 defined $consistency or croak "'query' needs a consistency level"; 804 805 _debug_wrap_result QUERY => $self, $self->_get_a_node->then( sub { 806 my $node = shift; 807 $self->debug_printf( "QUERY on {%s}: %s", $node->nodeid, $cql ); 808 $node->query( $cql, $consistency, %other_args ); 809 }); 810} 811 812=head2 $cass->query_rows( $cql, $consistency, %other_args ) ==> $result 813 814A shortcut wrapper for C<query> which expects a C<rows> result and returns it 815directly. Any other result is treated as an error. The returned Future returns 816a C<Protocol::CassandraCQL::Result> directly 817 818=cut 819 820sub query_rows 821{ 822 my $self = shift; 823 824 $self->query( @_ )->then( sub { 825 my ( $type, $result ) = @_; 826 $type eq "rows" or Future->new->fail( "Expected 'rows' result" ); 827 Future->new->done( $result ); 828 }); 829} 830 831=head2 $cass->prepare( $cql ) ==> $query 832 833Prepares a CQL query for later execution. On success, the returned Future 834yields an instance of a prepared query object (see below). 835 836Query objects stored internally cached by the CQL string; subsequent calls to 837C<prepare> with the same exact CQL string will yield the same object 838immediately, saving a roundtrip. 839 840=cut 841 842sub prepare 843{ 844 my $self = shift; 845 my ( $cql ) = @_; 846 847 my $nodes = $self->{nodes} or die "No available nodes"; 848 849 my $queries_by_cql = $self->{queries_by_cql}; 850 851 if( my $q = $queries_by_cql->{$cql} ) { 852 my $query = $q->[0]; 853 if( $q->[1] ) { 854 $q->[1]->cancel; 855 undef $q->[1]; 856 weaken( $q->[0] ); 857 } 858 return Future->new->done( $query ); 859 } 860 861 $self->debug_printf( "PREPARE %s", $cql ); 862 863 my @prepare_f = map { 864 my $node = $nodes->{$_}{conn}; 865 $node->prepare( $cql, $self ) 866 } keys %{ $self->{primary_ids} }; 867 868 Future->needs_all( @prepare_f )->then( sub { 869 my ( $query ) = @_; 870 # Ignore the other objects; they'll all have the same ID anyway 871 872 $self->debug_printf( "PREPARE => [%s]", unpack "H*", $query->id ); 873 874 my $q = $queries_by_cql->{$cql} = [ $query, undef ]; 875 weaken( $q->[0] ); 876 877 Future->new->done( $query ); 878 }); 879} 880 881sub _expire_query 882{ 883 my $self = shift; 884 my ( $cql ) = @_; 885 886 my $queries_by_cql = $self->{queries_by_cql}; 887 my $q = $queries_by_cql->{$cql} or return; 888 889 my $query = $q->[0]; undef $q->[0]; $q->[0] = $query; # unweaken 890 891 $q->[1] = $self->loop->delay_future( after => 60 ) 892 ->on_done( sub { 893 # Remove the {cassandra} element from the query so it doesn't 894 # re-register itself for expiry when it is DESTROYed again 895 undef $q->[0]{cassandra}; 896 delete $queries_by_cql->{$cql}; 897 }); 898} 899 900=head2 $cass->execute( $query, $data, $consistency, %other_args ) ==> ( $type, $result ) 901 902Executes a previously-prepared statement, given the binding data. On success, 903the returned Future will yield results of the same form as the C<query> 904method. C<$data> should contain a list of encoded byte-string values. 905 906Normally this method is not directly required - instead, use the C<execute> 907method on the query object itself, as this will encode the parameters 908correctly. 909 910C<%other_args> may be as for the C<query> method. 911 912=cut 913 914sub execute 915{ 916 my $self = shift; 917 my ( $query, $data, $consistency, %other_args ) = @_; 918 919 $consistency //= $self->{default_consistency}; 920 defined $consistency or croak "'execute' needs a consistency level"; 921 922 _debug_wrap_result EXECUTE => $self, $self->_get_a_node->then( sub { 923 my $node = shift; 924 $self->debug_printf( "EXECUTE on {%s}: %s [%s]", $node->nodeid, $query->cql, unpack "H*", $query->id ); 925 $node->execute( $query->id, $data, $consistency, %other_args ); 926 }); 927} 928 929=head1 CONVENIENT WRAPPERS 930 931The following wrapper methods all wrap the basic C<query> operation. 932 933=cut 934 935=head2 $cass->schema_keyspaces ==> $result 936 937A shortcut to a C<SELECT> query on C<system.schema_keyspaces>, which returns a 938result object listing all the keyspaces. 939 940Exact details of the returned columns will depend on the Cassandra version, 941but the result should at least be keyed by the first column, called 942C<keyspace_name>. 943 944 my $keyspaces = $result->rowmap_hash( "keyspace_name" ) 945 946=cut 947 948sub schema_keyspaces 949{ 950 my $self = shift; 951 952 $self->query_rows( 953 "SELECT * FROM system.schema_keyspaces", 954 CONSISTENCY_ONE 955 ); 956} 957 958=head2 $cass->schema_columnfamilies( $keyspace ) ==> $result 959 960A shortcut to a C<SELECT> query on C<system.schema_columnfamilies>, which 961returns a result object listing all the columnfamilies of the given keyspace. 962 963Exact details of the returned columns will depend on the Cassandra version, 964but the result should at least be keyed by the first column, called 965C<columnfamily_name>. 966 967 my $columnfamilies = $result->rowmap_hash( "columnfamily_name" ) 968 969=cut 970 971sub schema_columnfamilies 972{ 973 my $self = shift; 974 my ( $keyspace ) = @_; 975 976 $self->query_rows( 977 "SELECT * FROM system.schema_columnfamilies WHERE keyspace_name = " . $self->quote( $keyspace ), 978 CONSISTENCY_ONE 979 ); 980} 981 982=head2 $cass->schema_columns( $keyspace, $columnfamily ) ==> $result 983 984A shortcut to a C<SELECT> query on C<system.schema_columns>, which returns a 985result object listing all the columns of the given columnfamily. 986 987Exact details of the returned columns will depend on the Cassandra version, 988but the result should at least be keyed by the first column, called 989C<column_name>. 990 991 my $columns = $result->rowmap_hash( "column_name" ) 992 993=cut 994 995sub schema_columns 996{ 997 my $self = shift; 998 my ( $keyspace, $columnfamily ) = @_; 999 1000 $self->query_rows( 1001 "SELECT * FROM system.schema_columns WHERE keyspace_name = " . $self->quote( $keyspace ) . " AND columnfamily_name = " . $self->quote( $columnfamily ), 1002 CONSISTENCY_ONE, 1003 ); 1004} 1005 1006sub _list_nodes 1007{ 1008 my $self = shift; 1009 my ( $conn ) = @_; 1010 1011 # The system.peers table doesn't include the node we actually connect to. 1012 # So we'll have to look up its own information from system.local and add 1013 # the socket address manually. 1014 Future->needs_all( 1015 $conn->query( "SELECT data_center, rack FROM system.local", CONSISTENCY_ONE ) 1016 ->then( sub { 1017 my ( $type, $result ) = @_; 1018 $type eq "rows" or Future->new->fail( "Expected 'rows' result" ); 1019 my $local = $result->row_hash( 0 ); 1020 $local->{host} = $conn->nodeid; 1021 Future->new->done( $local ); 1022 }), 1023 $conn->query( "SELECT peer, data_center, rack FROM system.peers", CONSISTENCY_ONE ) 1024 ->then( sub { 1025 my ( $type, $result ) = @_; 1026 $type eq "rows" or Future->new->fail( "Expected 'rows' result" ); 1027 my @nodes = $result->rows_hash; 1028 foreach my $node ( @nodes ) { 1029 $node->{host} = _inet_to_string( delete $node->{peer} ); 1030 } 1031 Future->new->done( @nodes ); 1032 }), 1033 ) 1034} 1035 1036=head1 TODO 1037 1038=over 8 1039 1040=item * 1041 1042Allow other load-balancing strategies than roundrobin. 1043 1044=item * 1045 1046Adjust connected primary nodes when changing C<primaries> parameter. 1047 1048=item * 1049 1050Allow backup nodes, for faster connection failover. 1051 1052=item * 1053 1054Support LZ4 compression when using CQL version 2. 1055 1056This is blocked on RT #92825 1057 1058=back 1059 1060=cut 1061 1062=head1 SPONSORS 1063 1064This code was paid for by 1065 1066=over 2 1067 1068=item * 1069 1070Perceptyx L<http://www.perceptyx.com/> 1071 1072=item * 1073 1074Shadowcat Systems L<http://www.shadow.cat> 1075 1076=back 1077 1078=head1 AUTHOR 1079 1080Paul Evans <leonerd@leonerd.org.uk> 1081 1082=cut 1083 10840x55AA; 1085