1#!/usr/bin/perl
2
3use strict;
4use warnings;
5
6use Test::More;
7use Test::Identity;
8use Test::Refcount;
9
10use t::MockConnection;
11use Socket qw( pack_sockaddr_in inet_aton );
12
13use Net::Async::CassandraCQL;
14use Protocol::CassandraCQL::Result 0.06;
15
16# Mock the ->_connect_node method
17no warnings 'redefine';
18my %conns;
19local *Net::Async::CassandraCQL::_connect_node = sub {
20   my $self = shift;
21   my ( $connect_host, $connect_service ) = @_;
22   $conns{$connect_host} = my $conn = t::MockConnection->new( $connect_host );
23   return Future->new->done( $conn );
24};
25
26my $cass = Net::Async::CassandraCQL->new(
27   host => "10.0.0.1",
28);
29
30my $f = $cass->connect;
31
32ok( my $c = $conns{"10.0.0.1"}, 'Connected to 10.0.0.1' );
33
34# Initial nodelist query
35$c->send_nodelist(
36   local => { dc => "DC1", rack => "rack1" },
37   peers => {
38      "10.0.0.2" => { dc => "DC1", rack => "rack1" },
39   },
40);
41
42$f->get;
43
44ok( $c->is_registered, 'Using 10.0.0.1 for events' );
45
46# status changes
47{
48   my $nodeid;
49   $cass->configure(
50      on_node_up   => sub { ( undef, $nodeid ) = @_; },
51      on_node_down => sub { ( undef, $nodeid ) = @_; },
52   );
53
54   # DOWN
55   ok( !defined $cass->{nodes}{"10.0.0.2"}{down_time}, 'Node 10.0.0.2 does not yet have down_time' );
56
57   $conns{"10.0.0.1"}->invoke_event(
58      on_status_change => DOWN => pack_sockaddr_in( 0, inet_aton( "10.0.0.2" ) ),
59   );
60
61   ok( defined $cass->{nodes}{"10.0.0.2"}{down_time}, 'Node 10.0.0.2 has down_time after STATUS_CHANGE DOWN' );
62   is( $nodeid, "10.0.0.2", '$nodeid to cluster on_node_down' );
63   undef $nodeid;
64
65   # UP
66   $conns{"10.0.0.1"}->invoke_event(
67      on_status_change => UP => pack_sockaddr_in( 0, inet_aton( "10.0.0.2" ) ),
68   );
69
70   ok( !defined $cass->{nodes}{"10.0.0.2"}{down_time}, 'Node 10.0.0.2 no longer has down_time after STATUS_CHANGE UP' );
71   is( $nodeid, "10.0.0.2", '$nodeid to cluster on_node_up' );
72}
73
74# topology changes
75{
76   my $nodeid;
77   $cass->configure(
78      on_node_new    => sub { ( undef, $nodeid ) = @_; },
79      on_node_removed => sub { ( undef, $nodeid ) = @_; },
80   );
81
82   # NEW
83   ok( !defined $cass->{nodes}{"10.0.0.4"}, 'Node 10.0.0.4 does not yet exist' );
84   $conns{"10.0.0.1"}->invoke_event(
85      on_topology_change => NEW_NODE => pack_sockaddr_in( 0, inet_aton( "10.0.0.4" ) ),
86   );
87
88   ok( defined $cass->{nodes}{"10.0.0.4"}, 'Node 10.0.0.4 exists' );
89
90   ok( my $q = $conns{"10.0.0.1"}->next_query, 'Peerlist query exists' );
91   is( $q->[1], "SELECT peer, data_center, rack FROM system.peers WHERE peer = '10.0.0.4'", 'Peerlist query CQL' );
92   $q->[2]->done( rows =>
93      Protocol::CassandraCQL::Result->new(
94         columns => [
95            [ system => peers => peer        => "VARCHAR" ],
96            [ system => peers => data_center => "VARCHAR" ],
97            [ system => peers => rack        => "VARCHAR" ],
98         ],
99         rows => [
100            [ "\x0a\0\0\4", "DC1", "rack1" ],
101         ],
102      )
103   );
104
105   is( $nodeid, "10.0.0.4", '$nodeid to cluster on_node_new' );
106   ok( defined $cass->{nodes}{"10.0.0.4"}{data_center}, 'Node 10.0.0.4 has known DC' );
107   undef $nodeid;
108
109   # DELETE
110   $conns{"10.0.0.1"}->invoke_event(
111      on_topology_change => REMOVED_NODE => pack_sockaddr_in( 0, inet_aton( "10.0.0.4" ) ),
112   );
113
114   ok( !defined $cass->{nodes}{"10.0.0.4"}, 'Node 10.0.0.4 no longer exists' );
115
116   is( $nodeid, "10.0.0.4", '$nodeid to cluster on_node_removed' );
117}
118
119
120done_testing;
121