1#!perl
2
3use warnings;
4use strict;
5use Test::More;
6use Test::Fatal;
7use Test::Deep;
8use Test::SharedFork;
9use Redis::Fast;
10use lib 't/tlib';
11use Test::SpawnRedisServer qw( redis reap );
12
13my ($c, $srv) = redis();
14END { $c->() if $c }
15{
16my $r = Redis::Fast->new(server => $srv);
17eval { $r->publish( 'aa', 'v1' ) };
18plan 'skip_all' => "pubsub not implemented on this redis server"  if $@ && $@ =~ /unknown command/;
19}
20
21my ($another_kill_switch, $yet_another_kill_switch);
22END { $_ and $_->() for($another_kill_switch, $yet_another_kill_switch) }
23
24subtest 'basics' => sub {
25  my %got;
26  ok(my $pub = Redis::Fast->new(server => $srv), 'connected to our test redis-server (pub)');
27  ok(my $sub = Redis::Fast->new(server => $srv), 'connected to our test redis-server (sub)');
28
29  is($pub->publish('aa', 'v1'), 0, "No subscribers to 'aa' topic");
30
31  my $db_size = -1;
32  $sub->dbsize(sub { $db_size = $_[0] });
33
34
35  ## Basic pubsub
36  my $sub_cb = sub { my ($v, $t, $s) = @_; $got{$s} = "$v:$t" };
37  $sub->subscribe('aa', 'bb', $sub_cb);
38  is($pub->publish('aa', 'v1'), 1, "Delivered to 1 subscriber of topic 'aa'");
39
40  is($db_size, 0, 'subscribing processes pending queued commands');
41
42  is($sub->wait_for_messages(1), 1, '... yep, got the expected 1 message');
43  cmp_deeply(\%got, { 'aa' => 'v1:aa' }, "... for the expected topic, 'aa'");
44
45  my $sub_cb2 = sub { my ($v, $t, $s) = @_; $got{"2$s"} = uc("$v:$t") };
46  $sub->subscribe('aa', $sub_cb2);
47  is($pub->publish('aa', 'v1'), 1, "Delivered to 1 subscriber of topic 'aa'");
48
49  is($sub->wait_for_messages(1), 1, '... yep, got the expected 1 message');
50  cmp_deeply(\%got, { 'aa' => 'v1:aa', '2aa' => 'V1:AA' }, "... for the expected topic, 'aa', with two handlers");
51
52
53  ## Trick subscribe with other messages
54  my $psub_cb = sub { my ($v, $t, $s) = @_; $got{$s} = "$v:$t" };
55  %got = ();
56  is($pub->publish('aa', 'v2'), 1, "Delivered to 1 subscriber of topic 'aa'");
57  $sub->psubscribe('a*', 'c*', $psub_cb);
58  cmp_deeply(
59    \%got,
60    { 'aa' => 'v2:aa', '2aa' => 'V2:AA' },
61    '... received message while processing psubscribe(), two handlers'
62  );
63
64  is($pub->publish('aa', 'v3'), 2, "Delivered to 2 subscriber of topic 'aa'");
65  is($sub->wait_for_messages(1), 2, '... yep, got the expected 2 messages');
66  cmp_deeply(
67    \%got,
68    { 'aa' => 'v3:aa', 'a*' => 'v3:aa', '2aa' => 'V3:AA' },
69    "... for the expected subs, 'aa' and 'a*', three handlers total"
70  );
71
72
73  ## Test subscribe/psubscribe diffs
74  %got = ();
75  is($pub->publish('aaa', 'v4'), 1, "Delivered to 1 subscriber of topic 'aaa'");
76  is($sub->wait_for_messages(1), 1, '... yep, got the expected 1 message');
77  cmp_deeply(\%got, { 'a*' => 'v4:aaa' }, "... for the expected sub, 'a*'");
78
79
80  ## Subscriber mode status
81  is($sub->is_subscriber, 4, 'Current subscriber has 4 subscriptions active');
82  is($pub->is_subscriber, 0, '... the publisher has none');
83
84
85  ## Unsubscribe
86  $sub->unsubscribe('xx', sub { });
87  is($sub->is_subscriber, 4, "No match to our subscriptions, unsubscribe doesn't change active count");
88
89  $sub->unsubscribe('aa', $sub_cb);
90  is($sub->is_subscriber, 4, "unsubscribe ok, active count is still 4, another handler is alive");
91
92  $sub->unsubscribe('aa', $sub_cb2);
93  is($sub->is_subscriber, 3, "unsubscribe done, active count is now 3, both handlers are done");
94
95  $pub->publish('aa', 'v5');
96  %got = ();
97  is($sub->wait_for_messages(1), 1, '... yep, got the expected 1 message');
98  cmp_deeply(\%got, { 'a*', 'v5:aa' }, "... for the expected key, 'a*'");
99
100  $sub->unsubscribe('a*', $psub_cb);
101  is($sub->is_subscriber, 3, "unsubscribe with topic wildcard failed, active count is now 3");
102
103  $pub->publish('aa', 'v6');
104  %got = ();
105  is($sub->wait_for_messages(1), 1, '... yep, got the expected 1 message');
106  cmp_deeply(\%got, { 'a*', 'v6:aa' }, "... for the expected key, 'a*'");
107
108  $sub->unsubscribe('bb', $sub_cb);
109  is($sub->is_subscriber, 2, "unsubscribe with 'bb' ok, active count is now 2");
110
111  $sub->punsubscribe('a*', $psub_cb);
112  is($sub->is_subscriber, 1, "punsubscribe with 'a*' ok, active count is now 1");
113
114  is($pub->publish('aa', 'v6'), 0, "Publish to 'aa' now gives 0 deliveries");
115  %got = ();
116  is($sub->wait_for_messages(1), 0, '... yep, no messages delivered');
117  cmp_deeply(\%got, {}, '... and an empty messages recorded set');
118
119  is($sub->is_subscriber, 1, 'Still some pending subcriptions active');
120  for my $cmd (qw<ping info keys dbsize shutdown>) {
121    like(
122      exception { $sub->$cmd },
123      qr/Cannot use command '(?i:$cmd)' while in SUBSCRIBE mode/,
124      ".. still an error to try \U$cmd\E while in SUBSCRIBE mode"
125    );
126  }
127  $sub->punsubscribe('c*', $psub_cb);
128  is($sub->is_subscriber, 0, '... but none anymore');
129
130  is(exception { $sub->info }, undef, 'Other commands ok after we leave subscriber_mode');
131};
132
133subtest 'zero_topic' => sub {
134  my %got;
135  my $pub = Redis::Fast->new(server => $srv);
136  my $sub = Redis::Fast->new(server => $srv);
137
138  my $db_size = -1;
139  $sub->dbsize(sub { $db_size = $_[0] });
140
141  my $bad_topic = '0';
142
143  my $sub_cb = sub { my ($v, $t, $s) = @_; $got{$s} = "$v:$t" };
144  $sub->psubscribe("$bad_topic*", 'xx', $sub_cb);
145  is($pub->publish($bad_topic, 'vBAD'), 1, "Delivered to 1 subscriber of topic '$bad_topic'");
146
147  is($sub->wait_for_messages(1), 1, '... yep, got the expected 1 message');
148  cmp_deeply(\%got, { "$bad_topic*" => "vBAD:$bad_topic" }, "... for the expected topic, '$bad_topic'");
149};
150
151
152subtest 'wait_for_messages forever' => sub {
153  my $pid = fork();
154  BAIL_OUT("Fork failed, aborting") unless defined $pid;
155
156  if ($pid) {  ## parent
157    diag("parent waiting for child $pid...");
158    my $failed = reap($pid, 11);
159    if ($failed) {
160      pass("wait_for_messages wait forever ");
161      kill(15, $pid);
162      reap($pid) and fail('failed to reap the dead child');
163    }
164    else {
165      fail("wait_for_messages return");
166    }
167  } else {
168    my $sub = Redis::Fast->new(server => $srv);
169    $sub->subscribe('chan', sub { });
170    $sub->wait_for_messages; ## never return
171    exit(0);
172  }
173};
174
175
176subtest 'server is killed while waiting for subscribe' => sub {
177  my ($another_kill_switch, $another_server) = redis();
178
179  my $pid = fork();
180  BAIL_OUT("Fork failed, aborting") unless defined $pid;
181
182  if ($pid) {    ## parent, we'll wait for the child to die quickly
183    ok(my $sync = Redis::Fast->new(server => $srv), 'connected to our test redis-server (sync parent)');
184    BAIL_OUT('Missed sync while waiting for child') unless defined $sync->blpop('wake_up_parent', 4);
185
186    ok($another_kill_switch->(), "pub/sub redis server killed");
187    diag("parent killed pub/sub redis server, signal child to proceed");
188    $sync->lpush('wake_up_child', 'the redis-server is dead, do your thing');
189
190    diag("parent waiting for child $pid...");
191    my $failed = reap($pid, 5);
192    if ($failed) {
193      fail("wait_for_messages() hangs when the server goes away...");
194      kill(9, $pid);
195      reap($pid) and fail('... failed to reap the dead child');
196    }
197    else {
198      pass("wait_for_messages() properly detects a server that dies");
199    }
200  }
201  else {    ## child
202    my $sync = Redis::Fast->new(server => $srv);
203    my $sub  = Redis::Fast->new(server => $another_server);
204    $sub->subscribe('chan', sub { });
205
206    diag("child is ready to test, signal parent to kill our server");
207    $sync->lpush('wake_up_parent', 'we are ready on this side, kill the server...');
208    die '## Missed sync while waiting for parent' unless defined $sync->blpop('wake_up_child', 4);
209
210    ## This is the test, next wait_for_messages() should not block
211    diag("now, check wait_for_messages(), should die...");
212    like(
213      exception { $sub->wait_for_messages(0) },
214      qr/EOF from server|Not connected to any server/,
215      "properly died with EOF"
216    );
217    exit(0);
218  }
219};
220
221subtest 'server is restarted while waiting for subscribe' => sub {
222  my @ret = redis();
223  my ($another_kill_switch, $another_server) = @ret;
224  my $port = pop @ret;
225
226  my $pid = fork();
227  BAIL_OUT("Fork failed, aborting") unless defined $pid;
228
229  if ($pid) {    ## parent, we'll wait for the child to die quickly
230
231    ok(my $sync = Redis::Fast->new(server => $srv), 'PARENT: connected to our test redis-server (sync parent)');
232    BAIL_OUT('Missed sync while waiting for child') unless defined $sync->blpop('wake_up_parent', 4);
233
234    ok($another_kill_switch->(), "PARENT: pub/sub redis server killed");
235    diag("PARENT: killed pub/sub redis server, signal child to proceed");
236    $sync->lpush('wake_up_child', 'the redis-server is dead, waiting before respawning it');
237
238    sleep 5;
239
240    # relaunch it on the same port
241    my ($yet_another_kill_switch) = redis(port => $port);
242    my $pub  = Redis::Fast->new(server => $another_server);
243
244    diag("PARENT: has relaunched the server...");
245    sleep 5;
246
247    is($pub->publish('chan', 'v1'), 1, "PARENT: published and the child is subscribed");
248
249    diag("PARENT: waiting for child $pid...");
250    my $failed = reap($pid, 5);
251    if ($failed) {
252      fail("PARENT: wait_for_messages() hangs when the server goes away...");
253      kill(9, $pid);
254      reap($pid) and fail('PARENT: ... failed to reap the dead child');
255    }
256    else {
257      pass("PARENT: child has properly quit after wait_for_messages()");
258    }
259    ok($yet_another_kill_switch->(), "PARENT: pub/sub redis server killed");
260  }
261  else {    ## child
262    my $sync = Redis::Fast->new(server => $srv);
263    my $sub  = Redis::Fast->new(server => $another_server,
264                          reconnect => 10,
265                          on_connect => sub { diag "CHILD: reconnected (with a 10s timeout)"; }
266                         );
267
268    my %got;
269    $sub->subscribe('chan', sub { my ($v, $t, $s) = @_; $got{$s} = "$v:$t" });
270
271    diag("CHILD: is ready to test, signal parent to restart our server");
272    $sync->lpush('wake_up_parent', 'we are ready on this side, kill the server...');
273    die '## Missed sync while waiting for parent' unless defined $sync->blpop('wake_up_child', 4);
274
275    ## This is the test, wait_for_messages() should reconnect to the respawned server
276    while (1) {
277        diag("CHILD: launch wait_for_messages(2), with reconnect...");
278        my $r = $sub->wait_for_messages(2);
279        $r and last;
280        diag("CHILD: after 2 sec, nothing yet, retrying");
281    }
282    diag("CHILD: child received the message");
283    cmp_deeply(\%got, { 'chan' => 'v1:chan' }, "CHILD: the message is what we want");
284    exit(0);
285  }
286};
287
288## And we are done
289done_testing();
290