1package AnyEvent::RabbitMQ::LocalQueue;
2
3use strict;
4use warnings;
5
6our $VERSION = '1.22'; # VERSION
7
8sub new {
9    my $class = shift;
10    return bless {
11        _message_queue    => [],
12        _drain_code_queue => [],
13    }, $class;
14}
15
16sub push {
17    my $self = shift;
18
19    CORE::push @{$self->{_message_queue}}, @_;
20    return $self->_drain_queue();
21}
22
23sub get {
24    my $self = shift;
25
26    CORE::push @{$self->{_drain_code_queue}}, @_;
27    return $self->_drain_queue();
28}
29
30sub _drain_queue {
31    my $self = shift;
32
33    my $message_count = scalar @{$self->{_message_queue}};
34    my $drain_code_count = scalar @{$self->{_drain_code_queue}};
35
36    my $count = $message_count < $drain_code_count
37              ? $message_count : $drain_code_count;
38
39    for (1 .. $count) {
40        &{shift @{$self->{_drain_code_queue}}}(
41            shift @{$self->{_message_queue}}
42        );
43    }
44
45    return $self;
46}
47
48sub _flush {
49    my ($self, $frame) = @_;
50
51    $self->_drain_queue;
52
53    while (my $cb = shift @{$self->{_drain_code_queue}}) {
54        local $@; # Flush frames immediately, throwing away errors for on-close
55        eval { $cb->($frame) };
56    }
57}
58
591;
60
61