1#!/usr/bin/perl 2 3use strict; 4use warnings; 5 6use Test::More; 7use Test::Identity; 8use Test::Refcount; 9 10use t::MockConnection; 11use Socket qw( pack_sockaddr_in inet_aton ); 12 13use Net::Async::CassandraCQL; 14use Protocol::CassandraCQL::Result 0.06; 15 16# Mock the ->_connect_node method 17no warnings 'redefine'; 18my %conns; 19local *Net::Async::CassandraCQL::_connect_node = sub { 20 my $self = shift; 21 my ( $connect_host, $connect_service ) = @_; 22 $conns{$connect_host} = my $conn = t::MockConnection->new( $connect_host ); 23 return Future->new->done( $conn ); 24}; 25 26my $cass = Net::Async::CassandraCQL->new( 27 host => "10.0.0.1", 28); 29 30my $f = $cass->connect; 31 32ok( my $c = $conns{"10.0.0.1"}, 'Connected to 10.0.0.1' ); 33 34# Initial nodelist query 35$c->send_nodelist( 36 local => { dc => "DC1", rack => "rack1" }, 37 peers => { 38 "10.0.0.2" => { dc => "DC1", rack => "rack1" }, 39 }, 40); 41 42$f->get; 43 44ok( $c->is_registered, 'Using 10.0.0.1 for events' ); 45 46# status changes 47{ 48 my $nodeid; 49 $cass->configure( 50 on_node_up => sub { ( undef, $nodeid ) = @_; }, 51 on_node_down => sub { ( undef, $nodeid ) = @_; }, 52 ); 53 54 # DOWN 55 ok( !defined $cass->{nodes}{"10.0.0.2"}{down_time}, 'Node 10.0.0.2 does not yet have down_time' ); 56 57 $conns{"10.0.0.1"}->invoke_event( 58 on_status_change => DOWN => pack_sockaddr_in( 0, inet_aton( "10.0.0.2" ) ), 59 ); 60 61 ok( defined $cass->{nodes}{"10.0.0.2"}{down_time}, 'Node 10.0.0.2 has down_time after STATUS_CHANGE DOWN' ); 62 is( $nodeid, "10.0.0.2", '$nodeid to cluster on_node_down' ); 63 undef $nodeid; 64 65 # UP 66 $conns{"10.0.0.1"}->invoke_event( 67 on_status_change => UP => pack_sockaddr_in( 0, inet_aton( "10.0.0.2" ) ), 68 ); 69 70 ok( !defined $cass->{nodes}{"10.0.0.2"}{down_time}, 'Node 10.0.0.2 no longer has down_time after STATUS_CHANGE UP' ); 71 is( $nodeid, "10.0.0.2", '$nodeid to cluster on_node_up' ); 72} 73 74# topology changes 75{ 76 my $nodeid; 77 $cass->configure( 78 on_node_new => sub { ( undef, $nodeid ) = @_; }, 79 on_node_removed => sub { ( undef, $nodeid ) = @_; }, 80 ); 81 82 # NEW 83 ok( !defined $cass->{nodes}{"10.0.0.4"}, 'Node 10.0.0.4 does not yet exist' ); 84 $conns{"10.0.0.1"}->invoke_event( 85 on_topology_change => NEW_NODE => pack_sockaddr_in( 0, inet_aton( "10.0.0.4" ) ), 86 ); 87 88 ok( defined $cass->{nodes}{"10.0.0.4"}, 'Node 10.0.0.4 exists' ); 89 90 ok( my $q = $conns{"10.0.0.1"}->next_query, 'Peerlist query exists' ); 91 is( $q->[1], "SELECT peer, data_center, rack FROM system.peers WHERE peer = '10.0.0.4'", 'Peerlist query CQL' ); 92 $q->[2]->done( rows => 93 Protocol::CassandraCQL::Result->new( 94 columns => [ 95 [ system => peers => peer => "VARCHAR" ], 96 [ system => peers => data_center => "VARCHAR" ], 97 [ system => peers => rack => "VARCHAR" ], 98 ], 99 rows => [ 100 [ "\x0a\0\0\4", "DC1", "rack1" ], 101 ], 102 ) 103 ); 104 105 is( $nodeid, "10.0.0.4", '$nodeid to cluster on_node_new' ); 106 ok( defined $cass->{nodes}{"10.0.0.4"}{data_center}, 'Node 10.0.0.4 has known DC' ); 107 undef $nodeid; 108 109 # DELETE 110 $conns{"10.0.0.1"}->invoke_event( 111 on_topology_change => REMOVED_NODE => pack_sockaddr_in( 0, inet_aton( "10.0.0.4" ) ), 112 ); 113 114 ok( !defined $cass->{nodes}{"10.0.0.4"}, 'Node 10.0.0.4 no longer exists' ); 115 116 is( $nodeid, "10.0.0.4", '$nodeid to cluster on_node_removed' ); 117} 118 119 120done_testing; 121