1use strict; 2use warnings; 3use Test::More; 4use AnyEvent; 5use AnyMQ; 6use AnyMQ::Queue; 7 8my $tests = 2; 9 10my $sequence = 0; 11 12sub do_test { 13 my ( $channel, $client ) = @_; 14 my $seq = ++$sequence; 15 my @send_events = ( { data1 => $seq }, { data2 => $seq }, { data3 => $seq } ); 16 17 my $cv = AE::cv; 18 my $t = AE::timer 5, 0, sub { $cv->croak( "timeout" ); }; 19 20 my $pub = AnyMQ->topic( $channel ); 21 my $sub = AnyMQ->new_listener( $pub ); 22 $sub->on_error(sub { 23 my ($queue, $error, @msg) = @_; 24 $queue->persistent(0); 25 $queue->append(@msg); 26 }); 27 $sub->poll(sub { 28 if ($_[0]{data2}) { 29 die "poll fail"; 30 } 31 is $_[0]{data1}, $seq 32 }); 33 $sub->timeout(1); 34 $sub->on_timeout(sub { 35 isa_ok($_[0], 'AnyMQ::Queue'); 36 ok(1, 'timeout triggered after downgrade'); 37 like($_[1], qr'timeout'); 38 $cv->send(1); 39 }, 40 ); 41 # Publish events before the client has connected. 42 $pub->publish( $_ ) for @send_events; 43 $cv->recv; 44 ok( !$sub->destroyed ); 45 $cv = AE::cv; 46 $sub->on_timeout(undef); 47 $sub->poll_once(sub { 48 my @events = @_; 49 is_deeply \@events, [@send_events[1,2]], "got events"; 50 $cv->send; 51 }); 52 $cv->recv; 53 54} 55 56plan tests => 6 * $tests; 57do_test( 'comet', 'client_id' ) for 1 .. $tests; 58