1package AnyMQ::Trait::AMQP; 2use Moose::Role; 3use File::ShareDir; 4 5use AnyEvent; 6use AnyEvent::RabbitMQ; 7use JSON; 8use Try::Tiny; 9use Carp qw(croak carp); 10 11has host => (is => "ro", isa => "Str"); 12has port => (is => "ro", isa => "Int"); 13has user => (is => "ro", isa => "Str"); 14has pass => (is => "ro", isa => "Str"); 15has vhost => (is => "ro", isa => "Str"); 16has exchange => (is => "ro", isa => "Str"); 17 18has bind_mode => (is => "ro", isa => "Str", default => sub { 'exchange' }); 19 20has _rf => (is => "rw"); 21has _rf_channel => (is => "rw"); 22has _rf_queue => (is => "rw"); 23 24has cv => (is => "rw", isa => "AnyEvent::CondVar"); 25 26has on_ready => (is => "rw", isa => "CodeRef"); 27 28has _connected => (is => "rw", isa => "Bool"); 29 30sub default_amqp_spec { #this is to avoid loading coro 31 my $dir = File::ShareDir::dist_dir("AnyEvent-RabbitMQ"); 32 return "$dir/fixed_amqp0-8.xml"; 33} 34 35AnyEvent::RabbitMQ->load_xml_spec(default_amqp_spec()); 36 37sub BUILD {}; after 'BUILD' => sub { 38 my $self = shift; 39 40 my $cv = $self->cv(AE::cv); 41 42 $self->connect($cv); 43 my $cb; $cb = sub { 44 my $msg = $_[0]->recv; 45 if ( $msg eq 'init' ) { 46 $self->_connected(1); 47 $self->on_ready->() if $self->on_ready; 48 } 49 else { 50 my $cv = AE::cv; 51 $cv->cb($cb); 52 $self->cv($cv); 53 carp "Connection failed, retrying in 5 seconds. Reason: ".$msg; 54 my $w; $w = AnyEvent->timer(after => 5, 55 cb => sub { 56 undef $w; 57 $self->connect($cv); 58 }); 59 } 60 }; 61 $cv->cb($cb); 62 63 if (!$self->on_ready) { 64 while ((my $msg = $self->cv->recv) ne 'init') {}; 65 } 66}; 67 68sub connect { 69 my $self = shift; 70 my $cv = shift; 71 72 my $rf = AnyEvent::RabbitMQ->new(timeout => 1, verbose => 0); 73 $self->_rf($rf); 74 75 # XXX: wrapped object with monadic method modifier 76 # my $channel = run_monad { $rf->connect(....)->open_channel()->return } 77 # my $queue = run_monad { $channel->declare_queue(....)->return }->method_frame->queue; 78 # run_monad { $channel->consume( ....) } 79 80 my $init = sub { 81 my $channel = shift; 82 $channel->declare_queue( 83 exclusive => 1, 84 on_success => sub { 85 my $method = shift; 86 my $queue = $method->method_frame->queue; 87 $self->_rf_queue($queue); 88 $channel->consume(queue => $queue, 89 no_ack => 1, 90 on_success => sub { 91 $cv->send('init'); 92 }, 93 on_consume => $self->on_consume, 94 on_failure => $cv, 95 ); 96 }, 97 on_failure => $cv, 98 ) 99 }; 100 101 $rf->connect( 102 (map { $_ => $self->$_ } 103 qw(host port user pass vhost)), 104 on_success => sub { 105 $rf->open_channel( 106 on_success => sub { 107 my $channel = shift; 108 $self->_rf_channel($channel); 109 $channel->qos(); 110 return $init->($channel) 111 unless $self->exchange; 112 113 $channel->declare_exchange( 114 type => 'topic', 115 exchange => $self->exchange, 116 on_failure => $cv, 117 on_success => sub { 118 $init->($channel); 119 }, 120 ); 121 }, 122 on_failure => $cv, 123 ); 124 }, 125 on_close => sub { 126 # XXX: try to reconnect and reinstantiate all topics 127 warn "==> connection closed"; 128 }, 129 on_failure => $cv, 130 ); 131} 132 133sub on_consume { 134 my $self = shift; 135 sub { 136 my $frame = shift; 137 my $payload = $frame->{body}->payload; 138 my $reply_to = $frame->{header}->reply_to; 139 return if $reply_to && $reply_to eq $self->_rf_queue; 140 my $topic = $frame->{deliver}->method_frame->routing_key; 141 try { $self->topics->{$topic}->AnyMQ::Topic::publish(JSON::from_json($payload)) } 142 catch { croak "failed to republsih on $topic: $_" }; 143 }; 144} 145 146sub new_topic { 147 my ($self, $opt) = @_; 148 $opt = { name => $opt } unless ref $opt; 149 AnyMQ::Topic->new_with_traits( 150 traits => ['AMQP'], 151 %$opt, 152 bus => $self ); 153} 154 155sub DEMOLISH {}; after 'DEMOLISH' => sub { 156 my $self = shift; 157 my ($igd) = @_; 158 return unless $self->_rf; 159 return if $igd; 160 my $q = AE::cv; 161 $self->_rf->close( on_success => $q, on_failure => $q ); 162 $q->recv; 163}; 164 1651; 166