1#!perl 2 3use warnings; 4use strict; 5use Test::More; 6use Test::Fatal; 7use Test::Deep; 8use Test::SharedFork; 9use Redis::Fast; 10use lib 't/tlib'; 11use Test::SpawnRedisServer qw( redis reap ); 12 13my ($c, $srv) = redis(); 14END { $c->() if $c } 15{ 16my $r = Redis::Fast->new(server => $srv); 17eval { $r->publish( 'aa', 'v1' ) }; 18plan 'skip_all' => "pubsub not implemented on this redis server" if $@ && $@ =~ /unknown command/; 19} 20 21my ($another_kill_switch, $yet_another_kill_switch); 22END { $_ and $_->() for($another_kill_switch, $yet_another_kill_switch) } 23 24subtest 'basics' => sub { 25 my %got; 26 ok(my $pub = Redis::Fast->new(server => $srv), 'connected to our test redis-server (pub)'); 27 ok(my $sub = Redis::Fast->new(server => $srv), 'connected to our test redis-server (sub)'); 28 29 is($pub->publish('aa', 'v1'), 0, "No subscribers to 'aa' topic"); 30 31 my $db_size = -1; 32 $sub->dbsize(sub { $db_size = $_[0] }); 33 34 35 ## Basic pubsub 36 my $sub_cb = sub { my ($v, $t, $s) = @_; $got{$s} = "$v:$t" }; 37 $sub->subscribe('aa', 'bb', $sub_cb); 38 is($pub->publish('aa', 'v1'), 1, "Delivered to 1 subscriber of topic 'aa'"); 39 40 is($db_size, 0, 'subscribing processes pending queued commands'); 41 42 is($sub->wait_for_messages(1), 1, '... yep, got the expected 1 message'); 43 cmp_deeply(\%got, { 'aa' => 'v1:aa' }, "... for the expected topic, 'aa'"); 44 45 my $sub_cb2 = sub { my ($v, $t, $s) = @_; $got{"2$s"} = uc("$v:$t") }; 46 $sub->subscribe('aa', $sub_cb2); 47 is($pub->publish('aa', 'v1'), 1, "Delivered to 1 subscriber of topic 'aa'"); 48 49 is($sub->wait_for_messages(1), 1, '... yep, got the expected 1 message'); 50 cmp_deeply(\%got, { 'aa' => 'v1:aa', '2aa' => 'V1:AA' }, "... for the expected topic, 'aa', with two handlers"); 51 52 53 ## Trick subscribe with other messages 54 my $psub_cb = sub { my ($v, $t, $s) = @_; $got{$s} = "$v:$t" }; 55 %got = (); 56 is($pub->publish('aa', 'v2'), 1, "Delivered to 1 subscriber of topic 'aa'"); 57 $sub->psubscribe('a*', 'c*', $psub_cb); 58 cmp_deeply( 59 \%got, 60 { 'aa' => 'v2:aa', '2aa' => 'V2:AA' }, 61 '... received message while processing psubscribe(), two handlers' 62 ); 63 64 is($pub->publish('aa', 'v3'), 2, "Delivered to 2 subscriber of topic 'aa'"); 65 is($sub->wait_for_messages(1), 2, '... yep, got the expected 2 messages'); 66 cmp_deeply( 67 \%got, 68 { 'aa' => 'v3:aa', 'a*' => 'v3:aa', '2aa' => 'V3:AA' }, 69 "... for the expected subs, 'aa' and 'a*', three handlers total" 70 ); 71 72 73 ## Test subscribe/psubscribe diffs 74 %got = (); 75 is($pub->publish('aaa', 'v4'), 1, "Delivered to 1 subscriber of topic 'aaa'"); 76 is($sub->wait_for_messages(1), 1, '... yep, got the expected 1 message'); 77 cmp_deeply(\%got, { 'a*' => 'v4:aaa' }, "... for the expected sub, 'a*'"); 78 79 80 ## Subscriber mode status 81 is($sub->is_subscriber, 4, 'Current subscriber has 4 subscriptions active'); 82 is($pub->is_subscriber, 0, '... the publisher has none'); 83 84 85 ## Unsubscribe 86 $sub->unsubscribe('xx', sub { }); 87 is($sub->is_subscriber, 4, "No match to our subscriptions, unsubscribe doesn't change active count"); 88 89 $sub->unsubscribe('aa', $sub_cb); 90 is($sub->is_subscriber, 4, "unsubscribe ok, active count is still 4, another handler is alive"); 91 92 $sub->unsubscribe('aa', $sub_cb2); 93 is($sub->is_subscriber, 3, "unsubscribe done, active count is now 3, both handlers are done"); 94 95 $pub->publish('aa', 'v5'); 96 %got = (); 97 is($sub->wait_for_messages(1), 1, '... yep, got the expected 1 message'); 98 cmp_deeply(\%got, { 'a*', 'v5:aa' }, "... for the expected key, 'a*'"); 99 100 $sub->unsubscribe('a*', $psub_cb); 101 is($sub->is_subscriber, 3, "unsubscribe with topic wildcard failed, active count is now 3"); 102 103 $pub->publish('aa', 'v6'); 104 %got = (); 105 is($sub->wait_for_messages(1), 1, '... yep, got the expected 1 message'); 106 cmp_deeply(\%got, { 'a*', 'v6:aa' }, "... for the expected key, 'a*'"); 107 108 $sub->unsubscribe('bb', $sub_cb); 109 is($sub->is_subscriber, 2, "unsubscribe with 'bb' ok, active count is now 2"); 110 111 $sub->punsubscribe('a*', $psub_cb); 112 is($sub->is_subscriber, 1, "punsubscribe with 'a*' ok, active count is now 1"); 113 114 is($pub->publish('aa', 'v6'), 0, "Publish to 'aa' now gives 0 deliveries"); 115 %got = (); 116 is($sub->wait_for_messages(1), 0, '... yep, no messages delivered'); 117 cmp_deeply(\%got, {}, '... and an empty messages recorded set'); 118 119 is($sub->is_subscriber, 1, 'Still some pending subcriptions active'); 120 for my $cmd (qw<ping info keys dbsize shutdown>) { 121 like( 122 exception { $sub->$cmd }, 123 qr/Cannot use command '(?i:$cmd)' while in SUBSCRIBE mode/, 124 ".. still an error to try \U$cmd\E while in SUBSCRIBE mode" 125 ); 126 } 127 $sub->punsubscribe('c*', $psub_cb); 128 is($sub->is_subscriber, 0, '... but none anymore'); 129 130 is(exception { $sub->info }, undef, 'Other commands ok after we leave subscriber_mode'); 131}; 132 133subtest 'zero_topic' => sub { 134 my %got; 135 my $pub = Redis::Fast->new(server => $srv); 136 my $sub = Redis::Fast->new(server => $srv); 137 138 my $db_size = -1; 139 $sub->dbsize(sub { $db_size = $_[0] }); 140 141 my $bad_topic = '0'; 142 143 my $sub_cb = sub { my ($v, $t, $s) = @_; $got{$s} = "$v:$t" }; 144 $sub->psubscribe("$bad_topic*", 'xx', $sub_cb); 145 is($pub->publish($bad_topic, 'vBAD'), 1, "Delivered to 1 subscriber of topic '$bad_topic'"); 146 147 is($sub->wait_for_messages(1), 1, '... yep, got the expected 1 message'); 148 cmp_deeply(\%got, { "$bad_topic*" => "vBAD:$bad_topic" }, "... for the expected topic, '$bad_topic'"); 149}; 150 151 152subtest 'wait_for_messages forever' => sub { 153 my $pid = fork(); 154 BAIL_OUT("Fork failed, aborting") unless defined $pid; 155 156 if ($pid) { ## parent 157 diag("parent waiting for child $pid..."); 158 my $failed = reap($pid, 11); 159 if ($failed) { 160 pass("wait_for_messages wait forever "); 161 kill(15, $pid); 162 reap($pid) and fail('failed to reap the dead child'); 163 } 164 else { 165 fail("wait_for_messages return"); 166 } 167 } else { 168 my $sub = Redis::Fast->new(server => $srv); 169 $sub->subscribe('chan', sub { }); 170 $sub->wait_for_messages; ## never return 171 exit(0); 172 } 173}; 174 175 176subtest 'server is killed while waiting for subscribe' => sub { 177 my ($another_kill_switch, $another_server) = redis(); 178 179 my $pid = fork(); 180 BAIL_OUT("Fork failed, aborting") unless defined $pid; 181 182 if ($pid) { ## parent, we'll wait for the child to die quickly 183 ok(my $sync = Redis::Fast->new(server => $srv), 'connected to our test redis-server (sync parent)'); 184 BAIL_OUT('Missed sync while waiting for child') unless defined $sync->blpop('wake_up_parent', 4); 185 186 ok($another_kill_switch->(), "pub/sub redis server killed"); 187 diag("parent killed pub/sub redis server, signal child to proceed"); 188 $sync->lpush('wake_up_child', 'the redis-server is dead, do your thing'); 189 190 diag("parent waiting for child $pid..."); 191 my $failed = reap($pid, 5); 192 if ($failed) { 193 fail("wait_for_messages() hangs when the server goes away..."); 194 kill(9, $pid); 195 reap($pid) and fail('... failed to reap the dead child'); 196 } 197 else { 198 pass("wait_for_messages() properly detects a server that dies"); 199 } 200 } 201 else { ## child 202 my $sync = Redis::Fast->new(server => $srv); 203 my $sub = Redis::Fast->new(server => $another_server); 204 $sub->subscribe('chan', sub { }); 205 206 diag("child is ready to test, signal parent to kill our server"); 207 $sync->lpush('wake_up_parent', 'we are ready on this side, kill the server...'); 208 die '## Missed sync while waiting for parent' unless defined $sync->blpop('wake_up_child', 4); 209 210 ## This is the test, next wait_for_messages() should not block 211 diag("now, check wait_for_messages(), should die..."); 212 like( 213 exception { $sub->wait_for_messages(0) }, 214 qr/EOF from server|Not connected to any server/, 215 "properly died with EOF" 216 ); 217 exit(0); 218 } 219}; 220 221subtest 'server is restarted while waiting for subscribe' => sub { 222 my @ret = redis(); 223 my ($another_kill_switch, $another_server) = @ret; 224 my $port = pop @ret; 225 226 my $pid = fork(); 227 BAIL_OUT("Fork failed, aborting") unless defined $pid; 228 229 if ($pid) { ## parent, we'll wait for the child to die quickly 230 231 ok(my $sync = Redis::Fast->new(server => $srv), 'PARENT: connected to our test redis-server (sync parent)'); 232 BAIL_OUT('Missed sync while waiting for child') unless defined $sync->blpop('wake_up_parent', 4); 233 234 ok($another_kill_switch->(), "PARENT: pub/sub redis server killed"); 235 diag("PARENT: killed pub/sub redis server, signal child to proceed"); 236 $sync->lpush('wake_up_child', 'the redis-server is dead, waiting before respawning it'); 237 238 sleep 5; 239 240 # relaunch it on the same port 241 my ($yet_another_kill_switch) = redis(port => $port); 242 my $pub = Redis::Fast->new(server => $another_server); 243 244 diag("PARENT: has relaunched the server..."); 245 sleep 5; 246 247 is($pub->publish('chan', 'v1'), 1, "PARENT: published and the child is subscribed"); 248 249 diag("PARENT: waiting for child $pid..."); 250 my $failed = reap($pid, 5); 251 if ($failed) { 252 fail("PARENT: wait_for_messages() hangs when the server goes away..."); 253 kill(9, $pid); 254 reap($pid) and fail('PARENT: ... failed to reap the dead child'); 255 } 256 else { 257 pass("PARENT: child has properly quit after wait_for_messages()"); 258 } 259 ok($yet_another_kill_switch->(), "PARENT: pub/sub redis server killed"); 260 } 261 else { ## child 262 my $sync = Redis::Fast->new(server => $srv); 263 my $sub = Redis::Fast->new(server => $another_server, 264 reconnect => 10, 265 on_connect => sub { diag "CHILD: reconnected (with a 10s timeout)"; } 266 ); 267 268 my %got; 269 $sub->subscribe('chan', sub { my ($v, $t, $s) = @_; $got{$s} = "$v:$t" }); 270 271 diag("CHILD: is ready to test, signal parent to restart our server"); 272 $sync->lpush('wake_up_parent', 'we are ready on this side, kill the server...'); 273 die '## Missed sync while waiting for parent' unless defined $sync->blpop('wake_up_child', 4); 274 275 ## This is the test, wait_for_messages() should reconnect to the respawned server 276 while (1) { 277 diag("CHILD: launch wait_for_messages(2), with reconnect..."); 278 my $r = $sub->wait_for_messages(2); 279 $r and last; 280 diag("CHILD: after 2 sec, nothing yet, retrying"); 281 } 282 diag("CHILD: child received the message"); 283 cmp_deeply(\%got, { 'chan' => 'v1:chan' }, "CHILD: the message is what we want"); 284 exit(0); 285 } 286}; 287 288## And we are done 289done_testing(); 290