1package AnyMQ::Trait::AMQP;
2use Moose::Role;
3use File::ShareDir;
4
5use AnyEvent;
6use AnyEvent::RabbitMQ;
7use JSON;
8use Try::Tiny;
9use Carp qw(croak carp);
10
11has host => (is => "ro", isa => "Str");
12has port => (is => "ro", isa => "Int");
13has user => (is => "ro", isa => "Str");
14has pass => (is => "ro", isa => "Str");
15has vhost => (is => "ro", isa => "Str");
16has exchange => (is => "ro", isa => "Str");
17
18has bind_mode => (is => "ro", isa => "Str", default => sub { 'exchange' });
19
20has _rf => (is => "rw");
21has _rf_channel => (is => "rw");
22has _rf_queue => (is => "rw");
23
24has cv => (is => "rw", isa => "AnyEvent::CondVar");
25
26has on_ready => (is => "rw", isa => "CodeRef");
27
28has _connected => (is => "rw", isa => "Bool");
29
30sub default_amqp_spec { #this is to avoid loading coro
31    my $dir = File::ShareDir::dist_dir("AnyEvent-RabbitMQ");
32    return "$dir/fixed_amqp0-8.xml";
33}
34
35AnyEvent::RabbitMQ->load_xml_spec(default_amqp_spec());
36
37sub BUILD {}; after 'BUILD' => sub {
38    my $self = shift;
39
40    my $cv = $self->cv(AE::cv);
41
42    $self->connect($cv);
43    my $cb; $cb = sub {
44        my $msg = $_[0]->recv;
45        if ( $msg eq 'init' ) {
46            $self->_connected(1);
47            $self->on_ready->() if $self->on_ready;
48        }
49        else {
50            my $cv = AE::cv;
51            $cv->cb($cb);
52            $self->cv($cv);
53            carp "Connection failed, retrying in 5 seconds.  Reason: ".$msg;
54            my $w; $w = AnyEvent->timer(after => 5,
55                                        cb => sub {
56                                            undef $w;
57                                            $self->connect($cv);
58                                        });
59        }
60    };
61    $cv->cb($cb);
62
63    if (!$self->on_ready) {
64        while ((my $msg = $self->cv->recv) ne 'init') {};
65    }
66};
67
68sub connect {
69    my $self = shift;
70    my $cv = shift;
71
72    my $rf = AnyEvent::RabbitMQ->new(timeout => 1, verbose => 0);
73    $self->_rf($rf);
74
75    # XXX: wrapped object with monadic method modifier
76    # my $channel = run_monad { $rf->connect(....)->open_channel()->return }
77    # my $queue = run_monad { $channel->declare_queue(....)->return }->method_frame->queue;
78    # run_monad { $channel->consume( ....) }
79
80    my $init = sub {
81        my $channel = shift;
82        $channel->declare_queue(
83            exclusive => 1,
84            on_success => sub {
85                my $method = shift;
86                my $queue = $method->method_frame->queue;
87                $self->_rf_queue($queue);
88                $channel->consume(queue => $queue,
89                                  no_ack => 1,
90                                  on_success => sub {
91                                      $cv->send('init');
92                                  },
93                                  on_consume => $self->on_consume,
94                                  on_failure => $cv,
95                              );
96            },
97            on_failure => $cv,
98        )
99    };
100
101    $rf->connect(
102        (map { $_ => $self->$_ }
103             qw(host port user pass vhost)),
104        on_success => sub {
105            $rf->open_channel(
106                on_success => sub {
107                    my $channel = shift;
108                    $self->_rf_channel($channel);
109                    $channel->qos();
110                    return $init->($channel)
111                        unless $self->exchange;
112
113                    $channel->declare_exchange(
114                        type => 'topic',
115                        exchange => $self->exchange,
116                        on_failure => $cv,
117                        on_success => sub {
118                            $init->($channel);
119                        },
120                    );
121                },
122                on_failure => $cv,
123            );
124        },
125        on_close => sub {
126            # XXX: try to reconnect and reinstantiate all topics
127            warn "==> connection closed";
128        },
129        on_failure => $cv,
130    );
131}
132
133sub on_consume {
134    my $self = shift;
135    sub {
136        my $frame = shift;
137        my $payload = $frame->{body}->payload;
138        my $reply_to = $frame->{header}->reply_to;
139        return if $reply_to && $reply_to eq $self->_rf_queue;
140        my $topic = $frame->{deliver}->method_frame->routing_key;
141        try { $self->topics->{$topic}->AnyMQ::Topic::publish(JSON::from_json($payload)) }
142        catch { croak "failed to republsih on $topic: $_" };
143    };
144}
145
146sub new_topic {
147    my ($self, $opt) = @_;
148    $opt = { name => $opt } unless ref $opt;
149    AnyMQ::Topic->new_with_traits(
150        traits => ['AMQP'],
151        %$opt,
152        bus  => $self );
153}
154
155sub DEMOLISH {}; after 'DEMOLISH' => sub {
156    my $self = shift;
157    my ($igd) = @_;
158    return unless $self->_rf;
159    return if $igd;
160    my $q = AE::cv;
161    $self->_rf->close( on_success => $q, on_failure => $q );
162    $q->recv;
163};
164
1651;
166