1package AnyMQ; 2use strict; 3use 5.008_001; 4our $VERSION = '0.35'; 5 6use AnyEvent; 7use Any::Moose; 8use AnyMQ::Topic; 9use AnyMQ::Queue; 10 11with any_moose("X::Traits"); 12 13has '+_trait_namespace' => (default => 'AnyMQ::Trait'); 14 15has topics => (is => "ro", isa => "HashRef[AnyMQ::Topic]", 16 default => sub { {} }); 17 18my $DEFAULT_BUS; 19 20sub topic { 21 my ($self, $opt) = @_; 22 $opt = { name => $opt } unless ref $opt; 23 $opt->{recycle} = 1 unless exists $opt->{recycle}; 24 25 unless (ref($self)) { 26 $self = ($DEFAULT_BUS ||= $self->new); 27 } 28 29 $self->topics->{$opt->{name}} ||= $self->new_topic( $opt ); 30} 31 32sub new_topic { 33 my ($self, $opt) = @_; 34 $opt = { name => $opt } unless ref $opt; 35 AnyMQ::Topic->new( %$opt, 36 bus => $self ); 37} 38 39sub new_listener { 40 my $self = shift; 41 unless (ref($self)) { 42 $self = ($DEFAULT_BUS ||= $self->new); 43 } 44 45 my $listener = AnyMQ::Queue->new; 46 if (@_) { 47 $listener->subscribe($_) 48 for @_; 49 } 50 return $listener; 51} 52 53__PACKAGE__->meta->make_immutable; 54no Any::Moose; 551; 56 57__END__ 58 59=encoding utf-8 60 61=for stopwords 62 63=head1 NAME 64 65AnyMQ - Non-blocking message queue system based on AnyEvent 66 67=head1 SYNOPSIS 68 69 use AnyMQ; 70 my $mq = AnyMQ->topic('Foo'); # gets an AnyMQ::Topic object 71 $mq->publish({ message => 'Hello world'}); 72 73 # bind to external message queue servers using traits. 74 # my $bus = AnyMQ->new_with_traits(traits => ['AMQP'], 75 # host => 'localhost', 76 # port => 5672, 77 # user => 'guest', 78 # pass => 'guest', 79 # vhost => '/', 80 # exchange => ''); 81 # my $mq = $bus->topic('foo') 82 83 $mq->publish({ message => 'Hello world'}); 84 85 # $bus->new_listener('client_id', $mq); 86 87=head1 DESCRIPTION 88 89AnyMQ is message queue system based on AnyEvent. It can store all 90messages in memory or use external message queue servers. 91 92Messages are published to L<AnyMQ::Topic>, and consumed with 93L<AnyMQ::Queue>. 94 95=head1 METHODS 96 97=head2 new 98 99Returns a new L<AnyMQ> object, which is a message bus that can 100associate with arbitrary L<AnyMQ::Topic> and consumed by 101L<AnyMQ::Queue> 102 103=head2 topic($name or %opt) 104 105Returns a L<AnyMQ::Topic> with given name or constructor options 106C<%opt>. If called as class method, the default bus will be used. 107Topics not known to the current AnyMQ bus will be created. 108 109=head2 new_topic($name or %opt) 110 111Creates and returns a new L<AnyMQ::Topic> object with given name or 112constructor options C<%opt>. This should not be called directly. 113 114=head2 new_listener(@topic) 115 116Returns a new L<AnyMQ::Queue> object, and subscribes to the optional 117given topic. If called as class method, the default bus will be used. 118 119=head1 AUTHORS 120 121Tatsuhiko Miyagawa 122Chia-liang Kao 123 124=head1 LICENSE 125 126This library is free software; you can redistribute it and/or modify 127it under the same terms as Perl itself. 128 129=head1 SEE ALSO 130 131L<AnyMQ::Topic>, L<AnyMQ::Queue> 132 133=cut 134