1package AnyEvent::RabbitMQ::LocalQueue; 2 3use strict; 4use warnings; 5 6our $VERSION = '1.22'; # VERSION 7 8sub new { 9 my $class = shift; 10 return bless { 11 _message_queue => [], 12 _drain_code_queue => [], 13 }, $class; 14} 15 16sub push { 17 my $self = shift; 18 19 CORE::push @{$self->{_message_queue}}, @_; 20 return $self->_drain_queue(); 21} 22 23sub get { 24 my $self = shift; 25 26 CORE::push @{$self->{_drain_code_queue}}, @_; 27 return $self->_drain_queue(); 28} 29 30sub _drain_queue { 31 my $self = shift; 32 33 my $message_count = scalar @{$self->{_message_queue}}; 34 my $drain_code_count = scalar @{$self->{_drain_code_queue}}; 35 36 my $count = $message_count < $drain_code_count 37 ? $message_count : $drain_code_count; 38 39 for (1 .. $count) { 40 &{shift @{$self->{_drain_code_queue}}}( 41 shift @{$self->{_message_queue}} 42 ); 43 } 44 45 return $self; 46} 47 48sub _flush { 49 my ($self, $frame) = @_; 50 51 $self->_drain_queue; 52 53 while (my $cb = shift @{$self->{_drain_code_queue}}) { 54 local $@; # Flush frames immediately, throwing away errors for on-close 55 eval { $cb->($frame) }; 56 } 57} 58 591; 60 61