1use strict;
2use warnings;
3use Test::More;
4use AnyEvent;
5use AnyMQ;
6use AnyMQ::Queue;
7
8my $tests = 2;
9
10my $sequence = 0;
11
12sub do_test {
13    my ( $channel, $client ) = @_;
14    my $seq = ++$sequence;
15    my @send_events = ( { data1 => $seq }, { data2 => $seq }, { data3 => $seq } );
16
17    my $cv = AE::cv;
18    my $t  = AE::timer 5, 0, sub { $cv->croak( "timeout" ); };
19
20    my $pub = AnyMQ->topic( $channel );
21    my $sub = AnyMQ->new_listener( $pub );
22    $sub->on_error(sub {
23                       my ($queue, $error, @msg) = @_;
24                       $queue->persistent(0);
25                       $queue->append(@msg);
26                   });
27    $sub->poll(sub {
28                   if ($_[0]{data2}) {
29                       die "poll fail";
30                   }
31                   is $_[0]{data1}, $seq
32               });
33    $sub->timeout(1);
34    $sub->on_timeout(sub {
35                       isa_ok($_[0], 'AnyMQ::Queue');
36                       ok(1, 'timeout triggered after downgrade');
37                       like($_[1], qr'timeout');
38                       $cv->send(1);
39                   },
40               );
41    # Publish events before the client has connected.
42    $pub->publish( $_ ) for @send_events;
43    $cv->recv;
44    ok( !$sub->destroyed );
45    $cv = AE::cv;
46    $sub->on_timeout(undef);
47    $sub->poll_once(sub {
48                        my @events = @_;
49                        is_deeply \@events, [@send_events[1,2]], "got events";
50                        $cv->send;
51                    });
52    $cv->recv;
53
54}
55
56plan tests => 6 * $tests;
57do_test( 'comet', 'client_id' ) for 1 .. $tests;
58