1package AnyEvent::RabbitMQ::Channel;
2
3use strict;
4use warnings;
5
6use AnyEvent::RabbitMQ::LocalQueue;
7use AnyEvent;
8use Scalar::Util qw( looks_like_number weaken );
9use Devel::GlobalDestruction;
10use Carp qw(croak cluck);
11use POSIX qw(ceil);
12BEGIN { *Dumper = \&AnyEvent::RabbitMQ::Dumper }
13
14our $VERSION = '1.22'; # VERSION
15
16use namespace::clean;
17
18use constant {
19    _ST_CLOSED => 0,
20    _ST_OPENING => 1,
21    _ST_OPEN => 2,
22};
23
24sub new {
25    my $class = shift;
26
27    my $self = bless {
28        on_close       => sub {},
29        @_,    # id, connection, on_return, on_close, on_inactive, on_active
30        _queue         => AnyEvent::RabbitMQ::LocalQueue->new,
31        _content_queue => AnyEvent::RabbitMQ::LocalQueue->new,
32    }, $class;
33    weaken($self->{connection});
34    return $self->_reset;
35}
36
37sub _reset {
38    my $self = shift;
39
40    my %a = (
41        _state         => _ST_CLOSED,
42        _is_active     => 0,
43        _is_confirm    => 0,
44        _publish_tag   => 0,
45        _publish_cbs   => {},  # values: [on_ack, on_nack, on_return]
46        _consumer_cbs  => {},  # values: [on_consume, on_cancel...]
47    );
48    @$self{keys %a} = values %a;
49
50    return $self;
51}
52
53sub id {
54    my $self = shift;
55    return $self->{id};
56}
57
58sub is_open {
59    my $self = shift;
60    return $self->{_state} == _ST_OPEN;
61}
62
63sub is_active {
64    my $self = shift;
65    return $self->{_is_active};
66}
67
68sub is_confirm {
69    my $self = shift;
70    return $self->{_is_confirm};
71}
72
73sub queue {
74    my $self = shift;
75    return $self->{_queue};
76}
77
78sub open {
79    my $self = shift;
80    my %args = @_;
81
82    if ($self->{_state} != _ST_CLOSED) {
83        $args{on_failure}->('Channel has already been opened');
84        return $self;
85    }
86
87    $self->{_state} = _ST_OPENING;
88
89    $self->{connection}->_push_write_and_read(
90        'Channel::Open', {}, 'Channel::OpenOk',
91        sub {
92            $self->{_state} = _ST_OPEN;
93            $self->{_is_active} = 1;
94            $args{on_success}->($self);
95        },
96        sub {
97	    $self->{_state} = _ST_CLOSED;
98            $args{on_failure}->($self);
99        },
100        $self->{id},
101    );
102
103    return $self;
104}
105
106sub close {
107    my $self = shift;
108    my $connection = $self->{connection}
109        or return;
110    my %args = $connection->_set_cbs(@_);
111
112    # If open in in progess, wait for it; 1s arbitrary timing.
113
114    weaken(my $wself = $self);
115    my $t; $t = AE::timer 0, 1, sub {
116	(my $self = $wself) or undef $t, return;
117	return if $self->{_state} == _ST_OPENING;
118
119	# No more tests are required
120	undef $t;
121
122        # Double close is OK
123	if ($self->{_state} == _ST_CLOSED) {
124	    $args{on_success}->($self);
125            return;
126        }
127
128        $connection->_push_write(
129            $self->_close_frame,
130            $self->{id},
131        );
132
133        # The spec says that after a party sends Channel::Close, it MUST
134        # discard all frames for that channel.  So this channel is dead
135        # immediately.
136        $self->_closed();
137
138        $connection->_push_read_and_valid(
139            'Channel::CloseOk',
140            sub {
141                $args{on_success}->($self);
142                $self->_orphan();
143            },
144            sub {
145                $args{on_failure}->(@_);
146                $self->_orphan();
147            },
148            $self->{id},
149        );
150    };
151
152    return $self;
153}
154
155sub _closed {
156    my $self = shift;
157    my ($frame,) = @_;
158    $frame ||= $self->_close_frame();
159
160    return if $self->{_state} == _ST_CLOSED;
161    $self->{_state} = _ST_CLOSED;
162
163    # Perform callbacks for all outstanding commands
164    $self->{_queue}->_flush($frame);
165    $self->{_content_queue}->_flush($frame);
166
167    # Fake nacks of all outstanding publishes
168    $_->($frame) for grep { defined } map { $_->[1] } values %{ $self->{_publish_cbs} };
169
170    # Report cancelation of all outstanding consumes
171    my @tags = keys %{ $self->{_consumer_cbs} };
172    $self->_canceled($_, $frame) for @tags;
173
174    # Report close to on_close callback
175    { local $@;
176      eval { $self->{on_close}->($frame) };
177      warn "Error in channel on_close callback, ignored:\n  $@  " if $@; }
178
179    # Reset state (partly redundant)
180    $self->_reset;
181
182    return $self;
183}
184
185sub _close_frame {
186    my $self = shift;
187    my ($text,) = @_;
188
189    Net::AMQP::Frame::Method->new(
190        method_frame => Net::AMQP::Protocol::Channel::Close->new(
191            reply_text => $text,
192        ),
193    );
194}
195
196sub _orphan {
197    my $self = shift;
198
199    if (my $connection = $self->{connection}) {
200        $connection->_delete_channel($self);
201    }
202    return $self;
203}
204
205sub declare_exchange {
206    my $self = shift;
207    my ($cb, $failure_cb, %args) = $self->_delete_cbs(@_);
208
209    return $self if !$self->_check_open($failure_cb);
210
211    $self->{connection}->_push_write_and_read(
212        'Exchange::Declare',
213        {
214            type        => 'direct',
215            passive     => 0,
216            durable     => 0,
217            auto_delete => 0,
218            internal    => 0,
219            %args, # exchange
220            ticket      => 0,
221            nowait      => 0, # FIXME
222        },
223        'Exchange::DeclareOk',
224        $cb,
225        $failure_cb,
226        $self->{id},
227    );
228
229    return $self;
230}
231
232sub bind_exchange {
233    my $self = shift;
234    my ($cb, $failure_cb, %args) = $self->_delete_cbs(@_);
235
236    return $self if !$self->_check_open($failure_cb);
237
238    $self->{connection}->_push_write_and_read(
239        'Exchange::Bind',
240        {
241            %args, # source, destination, routing_key
242            ticket      => 0,
243            nowait      => 0, # FIXME
244        },
245        'Exchange::BindOk',
246        $cb,
247        $failure_cb,
248        $self->{id},
249    );
250
251    return $self;
252}
253
254sub unbind_exchange {
255    my $self = shift;
256    my ($cb, $failure_cb, %args) = $self->_delete_cbs(@_);
257
258    return $self if !$self->_check_open($failure_cb);
259
260    $self->{connection}->_push_write_and_read(
261        'Exchange::Unbind',
262        {
263            %args, # source, destination, routing_key
264            ticket      => 0,
265            nowait      => 0, # FIXME
266        },
267        'Exchange::UnbindOk',
268        $cb,
269        $failure_cb,
270        $self->{id},
271    );
272
273    return $self;
274}
275
276sub delete_exchange {
277    my $self = shift;
278    my ($cb, $failure_cb, %args) = $self->_delete_cbs(@_);
279
280    return $self if !$self->_check_open($failure_cb);
281
282    $self->{connection}->_push_write_and_read(
283        'Exchange::Delete',
284        {
285            if_unused => 0,
286            %args, # exchange
287            ticket    => 0,
288            nowait    => 0, # FIXME
289        },
290        'Exchange::DeleteOk',
291        $cb,
292        $failure_cb,
293        $self->{id},
294    );
295
296    return $self;
297}
298
299sub declare_queue {
300    my $self = shift;
301    my ($cb, $failure_cb, %args) = $self->_delete_cbs(@_);
302
303    return $self if !$self->_check_open($failure_cb);
304
305    $self->{connection}->_push_write_and_read(
306        'Queue::Declare',
307        {
308            queue       => '',
309            passive     => 0,
310            durable     => 0,
311            exclusive   => 0,
312            auto_delete => 0,
313            no_ack      => 1,
314            %args,
315            ticket      => 0,
316            nowait      => 0, # FIXME
317        },
318        'Queue::DeclareOk',
319        $cb,
320        $failure_cb,
321        $self->{id},
322    );
323}
324
325sub bind_queue {
326    my $self = shift;
327    my ($cb, $failure_cb, %args) = $self->_delete_cbs(@_);
328
329    return $self if !$self->_check_open($failure_cb);
330
331    $self->{connection}->_push_write_and_read(
332        'Queue::Bind',
333        {
334            %args, # queue, exchange, routing_key
335            ticket => 0,
336            nowait => 0, # FIXME
337        },
338        'Queue::BindOk',
339        $cb,
340        $failure_cb,
341        $self->{id},
342    );
343
344    return $self;
345}
346
347sub unbind_queue {
348    my $self = shift;
349    my ($cb, $failure_cb, %args) = $self->_delete_cbs(@_);
350
351    return $self if !$self->_check_open($failure_cb);
352
353    $self->{connection}->_push_write_and_read(
354        'Queue::Unbind',
355        {
356            %args, # queue, exchange, routing_key
357            ticket => 0,
358        },
359        'Queue::UnbindOk',
360        $cb,
361        $failure_cb,
362        $self->{id},
363    );
364
365    return $self;
366}
367
368sub purge_queue {
369    my $self = shift;
370    my ($cb, $failure_cb, %args) = $self->_delete_cbs(@_);
371
372    return $self if !$self->_check_open($failure_cb);
373
374    $self->{connection}->_push_write_and_read(
375        'Queue::Purge',
376        {
377            %args, # queue
378            ticket => 0,
379            nowait => 0, # FIXME
380        },
381        'Queue::PurgeOk',
382        $cb,
383        $failure_cb,
384        $self->{id},
385    );
386
387    return $self;
388}
389
390sub delete_queue {
391    my $self = shift;
392    my ($cb, $failure_cb, %args) = $self->_delete_cbs(@_);
393
394    return $self if !$self->_check_open($failure_cb);
395
396    $self->{connection}->_push_write_and_read(
397        'Queue::Delete',
398        {
399            if_unused => 0,
400            if_empty  => 0,
401            %args, # queue
402            ticket    => 0,
403            nowait    => 0, # FIXME
404        },
405        'Queue::DeleteOk',
406        $cb,
407        $failure_cb,
408        $self->{id},
409    );
410
411    return $self;
412}
413
414sub publish {
415    my $self = shift;
416    my %args = @_;
417
418    # Docs should advise channel-level callback over this, but still, better to give user an out
419    unless ($self->{_is_active}) {
420        if (defined $args{on_inactive}) {
421            $args{on_inactive}->();
422            return $self;
423        }
424        croak "Can't publish on inactive channel (server flow control); provide on_inactive callback";
425    }
426
427    my $header_args = delete $args{header};
428    my $body        = delete $args{body};
429    my $ack_cb      = delete $args{on_ack};
430    my $nack_cb     = delete $args{on_nack};
431    my $return_cb   = delete $args{on_return};
432
433    defined($header_args) or $header_args = {};
434    defined($body) or $body = '';
435    if ( defined($ack_cb) or defined($nack_cb) or defined($return_cb) ) {
436        cluck "Can't set on_ack/on_nack/on_return callback when not in confirm mode"
437            unless $self->{_is_confirm};
438    }
439
440    my $tag;
441    if ($self->{_is_confirm}) {
442        # yeah, delivery tags in acks are sequential.  see Java client
443        $tag = ++$self->{_publish_tag};
444        if ($return_cb) {
445            $header_args = { %$header_args };
446            $header_args->{headers}->{_ar_return} = $tag;  # just reuse the same value, why not
447        }
448        $self->{_publish_cbs}->{$tag} = [$ack_cb, $nack_cb, $return_cb];
449    }
450
451    $self->_publish(
452        %args,
453    )->_header(
454        $header_args, $body,
455    )->_body(
456        $body,
457    );
458
459    return $self;
460}
461
462sub _publish {
463    my $self = shift;
464    my %args = @_;
465
466    $self->{connection}->_push_write(
467        Net::AMQP::Protocol::Basic::Publish->new(
468            exchange  => '',
469            mandatory => 0,
470            immediate => 0,
471            %args, # routing_key
472            ticket    => 0,
473        ),
474        $self->{id},
475    );
476
477    return $self;
478}
479
480sub _header {
481    my ($self, $args, $body) = @_;
482
483    my $weight = delete $args->{weight} || 0;
484
485    # user-provided message headers must be strings.  protect values that look like numbers.
486    my $headers = $args->{headers} || {};
487    my @prot = grep { my $v = $headers->{$_}; !ref($v) && looks_like_number($v) } keys %$headers;
488    if (@prot) {
489        $headers = {
490            %$headers,
491            map { $_ => Net::AMQP::Value::String->new($headers->{$_}) } @prot
492        };
493    }
494
495    $self->{connection}->_push_write(
496        Net::AMQP::Frame::Header->new(
497            weight       => $weight,
498            body_size    => length($body),
499            header_frame => Net::AMQP::Protocol::Basic::ContentHeader->new(
500                content_type     => 'application/octet-stream',
501                content_encoding => undef,
502                delivery_mode    => 1,
503                priority         => 1,
504                correlation_id   => undef,
505                expiration       => undef,
506                message_id       => undef,
507                timestamp        => time,
508                type             => undef,
509                user_id          => $self->{connection}->login_user,
510                app_id           => undef,
511                cluster_id       => undef,
512                %$args,
513                headers          => $headers,
514            ),
515        ),
516        $self->{id},
517    );
518
519    return $self;
520}
521
522sub _body {
523    my ($self, $body,) = @_;
524
525    my $body_max = $self->{connection}->{_body_max} || length $body;
526
527    # chunk up body into segments measured by $frame_max
528    while (length $body) {
529        $self->{connection}->_push_write(
530            Net::AMQP::Frame::Body->new(
531                payload => substr($body, 0, $body_max, '')),
532            $self->{id}
533        );
534    }
535
536    return $self;
537}
538
539sub consume {
540    my $self = shift;
541    my ($cb, $failure_cb, %args) = $self->_delete_cbs(@_);
542
543    return $self if !$self->_check_open($failure_cb);
544
545    my $consumer_cb = delete $args{on_consume}  || sub {};
546    my $cancel_cb   = delete $args{on_cancel}   || sub {};
547    my $no_ack      = delete $args{no_ack}      // 1;
548
549    $self->{connection}->_push_write_and_read(
550        'Basic::Consume',
551        {
552            consumer_tag => '',
553            no_local     => 0,
554            no_ack       => $no_ack,
555            exclusive    => 0,
556
557            %args, # queue
558            ticket       => 0,
559            nowait       => 0, # FIXME
560        },
561        'Basic::ConsumeOk',
562        sub {
563            my $frame = shift;
564            my $tag = $frame->method_frame->consumer_tag;
565            $self->{_consumer_cbs}->{$tag} = [ $consumer_cb, $cancel_cb ];
566            $cb->($frame);
567        },
568        $failure_cb,
569        $self->{id},
570    );
571
572    return $self;
573}
574
575sub cancel {
576    my $self = shift;
577    my ($cb, $failure_cb, %args) = $self->_delete_cbs(@_);
578
579    return $self if !$self->_check_open($failure_cb);
580
581    if (!defined $args{consumer_tag}) {
582        $failure_cb->('consumer_tag is not set');
583        return $self;
584    }
585
586    my $cons_cbs = $self->{_consumer_cbs}->{$args{consumer_tag}};
587    unless ($cons_cbs) {
588        $failure_cb->('Unknown consumer_tag');
589        return $self;
590    }
591    push @$cons_cbs, $cb;
592
593    $self->{connection}->_push_write(
594        Net::AMQP::Protocol::Basic::Cancel->new(
595            %args, # consumer_tag
596            nowait => 0,
597        ),
598        $self->{id},
599    );
600
601    return $self;
602}
603
604sub _canceled {
605    my $self = shift;
606    my ($tag, $frame,) = @_;
607
608    my $cons_cbs = delete $self->{_consumer_cbs}->{$tag}
609      or return 0;
610
611    shift @$cons_cbs; # no more deliveries
612    for my $cb (reverse @$cons_cbs) {
613        $cb->($frame);
614    }
615    return 1;
616}
617
618sub get {
619    my $self = shift;
620    my ($cb, $failure_cb, %args) = $self->_delete_cbs(@_);
621
622    my $no_ack = delete $args{no_ack} // 1;
623
624    return $self if !$self->_check_open($failure_cb);
625
626    $self->{connection}->_push_write_and_read(
627        'Basic::Get',
628        {
629            no_ack => $no_ack,
630            %args, # queue
631            ticket => 0,
632        },
633        [qw(Basic::GetOk Basic::GetEmpty)],
634        sub {
635            my $frame = shift;
636            return $cb->({empty => $frame})
637                if $frame->method_frame->isa('Net::AMQP::Protocol::Basic::GetEmpty');
638            $self->_push_read_header_and_body('ok', $frame, $cb, $failure_cb);
639        },
640        $failure_cb,
641        $self->{id},
642    );
643
644    return $self;
645}
646
647sub ack {
648    my $self = shift;
649    my %args = @_;
650
651    return $self if !$self->_check_open(sub {});
652
653    $self->{connection}->_push_write(
654        Net::AMQP::Protocol::Basic::Ack->new(
655            delivery_tag => 0,
656            multiple     => (
657                defined $args{delivery_tag} && $args{delivery_tag} != 0 ? 0 : 1
658            ),
659            %args,
660        ),
661        $self->{id},
662    );
663
664    return $self;
665}
666
667sub qos {
668    my $self = shift;
669    my ($cb, $failure_cb, %args) = $self->_delete_cbs(@_);
670
671    return $self if !$self->_check_open($failure_cb);
672
673    $self->{connection}->_push_write_and_read(
674        'Basic::Qos',
675        {
676            prefetch_count => 1,
677            prefetch_size  => 0,
678            global         => 0,
679            %args,
680        },
681        'Basic::QosOk',
682        $cb,
683        $failure_cb,
684        $self->{id},
685    );
686
687    return $self;
688}
689
690sub confirm {
691    my $self = shift;
692    my ($cb, $failure_cb, %args) = $self->_delete_cbs(@_);
693
694    return $self if !$self->_check_open($failure_cb);
695    return $self if !$self->_check_version(0, 9, $failure_cb);
696
697    weaken(my $wself = $self);
698
699    $self->{connection}->_push_write_and_read(
700        'Confirm::Select',
701        {
702            %args,
703            nowait       => 0, # FIXME
704        },
705        'Confirm::SelectOk',
706        sub {
707            my $me = $wself or return;
708            $me->{_is_confirm} = 1;
709            $cb->();
710        },
711        $failure_cb,
712        $self->{id},
713    );
714
715    return $self;
716}
717
718sub recover {
719    my $self = shift;
720    my ($cb, $failure_cb, %args) = $self->_delete_cbs(@_);
721
722    return $self if !$self->_check_open(sub {});
723
724    $self->{connection}->_push_write(
725        Net::AMQP::Protocol::Basic::Recover->new(
726            requeue => 1,
727            %args,
728        ),
729        $self->{id},
730    );
731
732     if (!$args{nowait} && $self->_check_version(0, 9)) {
733        $self->{connection}->_push_read_and_valid(
734            'Basic::RecoverOk',
735            $cb,
736            $failure_cb,
737            $self->{id},
738        );
739    }
740    else {
741        $cb->();
742    }
743
744    return $self;
745}
746
747sub reject {
748    my $self = shift;
749    my %args = @_;
750
751    return $self if !$self->_check_open( sub { } );
752
753    $self->{connection}->_push_write(
754        Net::AMQP::Protocol::Basic::Reject->new(
755            delivery_tag => 0,
756            requeue      => 0,
757            %args,
758        ),
759        $self->{id},
760    );
761
762    return $self;
763}
764
765sub select_tx {
766    my $self = shift;
767    my ($cb, $failure_cb,) = $self->_delete_cbs(@_);
768
769    return $self if !$self->_check_open($failure_cb);
770
771    $self->{connection}->_push_write_and_read(
772        'Tx::Select', {}, 'Tx::SelectOk',
773        $cb,
774        $failure_cb,
775        $self->{id},
776    );
777
778    return $self;
779}
780
781sub commit_tx {
782    my $self = shift;
783    my ($cb, $failure_cb,) = $self->_delete_cbs(@_);
784
785    return $self if !$self->_check_open($failure_cb);
786
787    $self->{connection}->_push_write_and_read(
788        'Tx::Commit', {}, 'Tx::CommitOk',
789        $cb,
790        $failure_cb,
791        $self->{id},
792    );
793
794    return $self;
795}
796
797sub rollback_tx {
798    my $self = shift;
799    my ($cb, $failure_cb,) = $self->_delete_cbs(@_);
800
801    return $self if !$self->_check_open($failure_cb);
802
803    $self->{connection}->_push_write_and_read(
804        'Tx::Rollback', {}, 'Tx::RollbackOk',
805        $cb,
806        $failure_cb,
807        $self->{id},
808    );
809
810    return $self;
811}
812
813sub push_queue_or_consume {
814    my $self = shift;
815    my ($frame, $failure_cb,) = @_;
816
817    # Note: the spec says that after a party sends Channel::Close, it MUST
818    # discard all frames for that channel other than Close and CloseOk.
819
820    if ($frame->isa('Net::AMQP::Frame::Method')) {
821        my $method_frame = $frame->method_frame;
822        if ($method_frame->isa('Net::AMQP::Protocol::Channel::Close')) {
823            $self->{connection}->_push_write(
824                Net::AMQP::Protocol::Channel::CloseOk->new(),
825                $self->{id},
826            );
827            $self->_closed($frame);
828            $self->_orphan();
829            return $self;
830        } elsif ($self->{_state} != _ST_OPEN) {
831            if ($method_frame->isa('Net::AMQP::Protocol::Channel::OpenOk') ||
832                $method_frame->isa('Net::AMQP::Protocol::Channel::CloseOk')) {
833                $self->{_queue}->push($frame);
834            }
835            return $self;
836        } elsif ($method_frame->isa('Net::AMQP::Protocol::Basic::Deliver')) {
837            my $cons_cbs = $self->{_consumer_cbs}->{$method_frame->consumer_tag};
838            my $cb = ($cons_cbs && $cons_cbs->[0]) || sub {};
839            $self->_push_read_header_and_body('deliver', $frame, $cb, $failure_cb);
840            return $self;
841        } elsif ($method_frame->isa('Net::AMQP::Protocol::Basic::CancelOk') ||
842                 $method_frame->isa('Net::AMQP::Protocol::Basic::Cancel')) {
843            # CancelOk means we asked for a cancel.
844            # Cancel means queue was deleted; it is not AMQP, but RMQ supports it.
845            if (!$self->_canceled($method_frame->consumer_tag, $frame)
846                  && $method_frame->isa('Net::AMQP::Protocol::Basic::CancelOk')) {
847                $failure_cb->("Received CancelOk for unknown consumer tag " . $method_frame->consumer_tag);
848            }
849            return $self;
850        } elsif ($method_frame->isa('Net::AMQP::Protocol::Basic::Return')) {
851            weaken(my $wself = $self);
852            my $cb = sub {
853                my $ret = shift;
854                my $me = $wself or return;
855                my $headers = $ret->{header}->headers || {};
856                my $onret_cb;
857                if (defined(my $tag = $headers->{_ar_return})) {
858                    my $cbs = $me->{_publish_cbs}->{$tag};
859                    $onret_cb = $cbs->[2] if $cbs;
860                }
861                $onret_cb ||= $me->{on_return} || $me->{connection}->{on_return} || sub {};  # oh well
862                $onret_cb->($frame);
863            };
864            $self->_push_read_header_and_body('return', $frame, $cb, $failure_cb);
865            return $self;
866        } elsif ($method_frame->isa('Net::AMQP::Protocol::Basic::Ack') ||
867                 $method_frame->isa('Net::AMQP::Protocol::Basic::Nack')) {
868            (my $resp = ref($method_frame)) =~ s/.*:://;
869            my $cbs;
870            if (!$self->{_is_confirm}) {
871                $failure_cb->("Received $resp when not in confirm mode");
872            }
873            else {
874                my @tags;
875                if ($method_frame->{multiple}) {
876                    @tags = sort { $a <=> $b }
877                              grep { $_ <= $method_frame->{delivery_tag} }
878                                keys %{$self->{_publish_cbs}};
879                }
880                else {
881                    @tags = ($method_frame->{delivery_tag});
882                }
883                my $cbi = ($resp eq 'Ack') ? 0 : 1;
884                for my $tag (@tags) {
885                    my $cbs;
886                    if (not $cbs = delete $self->{_publish_cbs}->{$tag}) {
887                        $failure_cb->("Received $resp of unknown delivery tag $tag");
888                    }
889                    elsif ($cbs->[$cbi]) {
890                        $cbs->[$cbi]->($frame);
891                    }
892                }
893            }
894            return $self;
895        } elsif ($method_frame->isa('Net::AMQP::Protocol::Channel::Flow')) {
896            $self->{_is_active} = $method_frame->active;
897            $self->{connection}->_push_write(
898                Net::AMQP::Protocol::Channel::FlowOk->new(
899                    active => $method_frame->active,
900                ),
901                $self->{id},
902            );
903            my $cbname = $self->{_is_active} ? 'on_active' : 'on_inactive';
904            my $cb = $self->{$cbname} || $self->{connection}->{$cbname} || sub {};
905            $cb->($frame);
906            return $self;
907        }
908        $self->{_queue}->push($frame);
909    } else {
910        $self->{_content_queue}->push($frame);
911    }
912
913    return $self;
914}
915
916sub _push_read_header_and_body {
917    my $self = shift;
918    my ($type, $frame, $cb, $failure_cb,) = @_;
919    my $response = {$type => $frame};
920    my $body_size = 0;
921    my $body_payload = "";
922
923    weaken(my $wcontq = $self->{_content_queue});
924    my $w_body_frame;
925    my $body_frame = sub {
926        my $frame = shift;
927
928        return $failure_cb->('Received data is not body frame')
929            if !$frame->isa('Net::AMQP::Frame::Body');
930
931        $body_payload .= $frame->payload;
932
933        if (length($body_payload) < $body_size) {
934            # More to come
935            my $contq = $wcontq or return;
936            $contq->get($w_body_frame);
937        }
938        else {
939            $frame->payload($body_payload);
940            $response->{body} = $frame;
941            $cb->($response);
942        }
943    };
944    $w_body_frame = $body_frame;
945    weaken($w_body_frame);
946
947    $self->{_content_queue}->get(sub{
948        my $frame = shift;
949
950        return $failure_cb->('Received data is not header frame')
951            if !$frame->isa('Net::AMQP::Frame::Header');
952
953        my $header_frame = $frame->header_frame;
954        return $failure_cb->(
955              'Header is not Protocol::Basic::ContentHeader'
956            . 'Header was ' . ref $header_frame
957        ) if !$header_frame->isa('Net::AMQP::Protocol::Basic::ContentHeader');
958
959        $response->{header} = $header_frame;
960
961        $body_size = $frame->body_size;
962        if ( $body_size ) {
963            my $contq = $wcontq or return;
964            $contq->get($body_frame);
965        } else {
966            $response->{body} = undef;
967            $cb->($response);
968        }
969    });
970
971    return $self;
972}
973
974sub _delete_cbs {
975    my $self = shift;
976    my %args = @_;
977
978    my $cb         = delete $args{on_success} || sub {};
979    my $failure_cb = delete $args{on_failure} || sub {die @_};
980
981    return $cb, $failure_cb, %args;
982}
983
984sub _check_open {
985    my $self = shift;
986    my ($failure_cb) = @_;
987
988    return 1 if $self->is_open();
989
990    $failure_cb->('Channel has already been closed');
991    return 0;
992}
993
994sub _check_version {
995    my $self = shift;
996    my ($major, $minor, $failure_cb) = @_;
997
998    my $amaj = $Net::AMQP::Protocol::VERSION_MAJOR;
999    my $amin = $Net::AMQP::Protocol::VERSION_MINOR;
1000
1001    return 1 if $amaj >= $major || $amaj == $major && $amin >= $minor;
1002
1003    $failure_cb->("Not supported in AMQP $amaj-$amin") if $failure_cb;
1004    return 0;
1005}
1006
1007sub DESTROY {
1008    my $self = shift;
1009    $self->close() if !in_global_destruction && $self->is_open();
1010    return;
1011}
1012
10131;
1014__END__
1015
1016=head1 NAME
1017
1018AnyEvent::RabbitMQ::Channel - Abstraction of an AMQP channel.
1019
1020=head1 SYNOPSIS
1021
1022    my $ch = $rf->open_channel();
1023    $ch->declare_exchange(exchange => 'test_exchange');
1024
1025=head1 DESCRIPTION
1026
1027A RabbitMQ channel.
1028
1029A channel is a light-weight virtual connection within a TCP connection to a
1030RabbitMQ broker.
1031
1032=head1 ARGUMENTS FOR C<open_channel>
1033
1034=over
1035
1036=item on_close
1037
1038Callback invoked when the channel closes.  Callback will be passed the
1039incoming message that caused the close, if any.
1040
1041=item on_return
1042
1043Callback invoked when a mandatory or immediate message publish fails.
1044Callback will be passed the incoming message, with accessors
1045C<method_frame>, C<header_frame>, and C<body_frame>.
1046
1047=back
1048
1049=head1 METHODS
1050
1051=head2 declare_exchange (%args)
1052
1053Declare an exchange (to publish messages to) on the server.
1054
1055Arguments:
1056
1057=over
1058
1059=item on_success
1060
1061=item on_failure
1062
1063=item type
1064
1065Default 'direct'
1066
1067=item passive
1068
1069Default 0
1070
1071=item durable
1072
1073Default 0
1074
1075=item auto_delete
1076
1077Default 0
1078
1079=item internal
1080
1081Default 0
1082
1083=item exchange
1084
1085The name of the exchange
1086
1087=back
1088
1089=head2 bind_exchange
1090
1091Binds an exchange to another exchange, with a routing key.
1092
1093Arguments:
1094
1095=over
1096
1097=item source
1098
1099The name of the source exchange to bind
1100
1101=item destination
1102
1103The name of the destination exchange to bind
1104
1105=item routing_key
1106
1107The routing key to bind with
1108
1109=back
1110
1111=head2 unbind_exchange
1112
1113=head2 delete_exchange
1114
1115=head2 declare_queue
1116
1117Declare a queue (create it if it doesn't exist yet) for publishing messages
1118to on the server.
1119
1120  my $done    = AnyEvent->condvar;
1121  $channel->declare_queue(
1122     exchange    => $queue_exchange,
1123     queue       => $queueName,
1124     durable     => 0,
1125     auto_delete => 1,
1126     passive     => 0,
1127     arguments   => { 'x-expires' => 0, },
1128     on_success  => sub { $done->send; },
1129     on_failure  => sub {
1130         say "Unable to create queue $queueName";
1131         $done->send;
1132     },
1133  );
1134  $done->recv;
1135
1136Arguments:
1137
1138=over
1139
1140=item queue
1141
1142Name of the queue to be declared. If the queue name is the empty string,
1143RabbitMQ will create a unique name for the queue. This is useful for
1144temporary/private reply queues.
1145
1146=item on_success
1147
1148Callback that is called when the queue was declared successfully. The argument
1149to the callback is of type L<Net::AMQP::Frame::Method>. To get the name of the
1150Queue (if you declared it with an empty name), you can say
1151
1152    on_success => sub {
1153        my $method = shift;
1154        my $name   = $method->method_frame->queue;
1155    };
1156
1157=item on_failure
1158
1159Callback that is called when the declaration of the queue has failed.
1160
1161=item auto_delete
1162
11630 or 1, default 0
1164
1165=item passive
1166
11670 or 1, default 0
1168
1169=item durable
1170
11710 or 1, default 0
1172
1173=item exclusive
1174
11750 or 1, default 0
1176
1177=item no_ack
1178
11790 or 1, default 1
1180
1181=item ticket
1182
1183default 0
1184
1185=for comment
1186XXX Is "exchange" a valid parameter?
1187
1188=item arguments
1189
1190C<arguments> is a hashref of additional parameters which RabbitMQ extensions
1191may use. This list is not complete and your RabbitMQ server configuration will
1192determine which arguments are valid and how they act.
1193
1194=over
1195
1196=item x-expires
1197
1198The queue will automatically be removed after being idle for this many milliseconds.
1199
1200Default of 0 disables automatic queue removal.
1201
1202=back
1203
1204=back
1205
1206=head2 bind_queue
1207
1208Binds a queue to an exchange, with a routing key.
1209
1210Arguments:
1211
1212=over
1213
1214=item queue
1215
1216The name of the queue to bind
1217
1218=item exchange
1219
1220The name of the exchange to bind
1221
1222=item routing_key
1223
1224The routing key to bind with
1225
1226=back
1227
1228=head2 unbind_queue
1229
1230=head2 purge_queue
1231
1232Flushes the contents of a queue.
1233
1234=head2 delete_queue
1235
1236Deletes a queue. The queue may not have any active consumers.
1237
1238=head2 consume
1239
1240Subscribe to consume messages from a queue.
1241
1242Arguments:
1243
1244=over
1245
1246=item queue
1247
1248The name of the queue to be consumed from.
1249
1250=item on_consume
1251
1252Callback called with an argument of the message which has been consumed.
1253
1254The message is a hash reference, where the value to key C<header> is an object
1255of type L<Net::AMQP::Protocol::Basic::ContentHeader>, L<body> is a
1256L<Net::AMQP::Frame::Body>, and C<deliver> a L<Net::AMQP::Frame::Method>.
1257
1258=item on_cancel
1259
1260Callback called if consumption is cancelled.  This may be at client request
1261or as a side effect of queue deletion.  (Notification of queue deletion is a
1262RabbitMQ extension.)
1263
1264=item consumer_tag
1265
1266Identifies this consumer, will be auto-generated if you do not provide it, but you must
1267supply a value if you want to be able to later cancel the subscription.
1268
1269=item on_success
1270
1271Callback called if the subscription was successful (before the first message is consumed).
1272
1273=item on_failure
1274
1275Callback called if the subscription fails for any reason.
1276
1277=item no_ack
1278
1279Pass through the C<no_ack> flag. Defaults to C<1>. If set to C<1>, the server
1280will not expect messages to be acknowledged.
1281
1282=back
1283
1284=head2 publish
1285
1286Publish a message to an exchange.
1287
1288Arguments:
1289
1290=over
1291
1292=item exchange
1293
1294The name of the exchange to send the message to.
1295
1296=item routing_key
1297
1298The routing key with which to publish the message.
1299
1300=item header
1301
1302Hash of AMQP message header info, including the confusingly similar element "headers",
1303which may contain arbitrary string key/value pairs.
1304
1305=item body
1306
1307The text body of the message to send.
1308
1309=item mandatory
1310
1311Boolean; if true, then if the message doesn't land in a queue (e.g. the exchange has no
1312bindings), it will be "returned."  (see "on_return")
1313
1314=item immediate
1315
1316Boolean; if true, then if the message cannot be delivered directly to a consumer, it
1317will be "returned."  (see "on_return")
1318
1319=item on_ack
1320
1321Callback called with the frame that acknowledges receipt (if channel is in confirm mode),
1322typically L<Net::AMQP::Protocol::Basic::Ack>.
1323
1324=item on_nack
1325
1326Callback called with the frame that declines receipt (if the channel is in confirm mode),
1327typically L<Net::AMQP::Protocol::Basic::Nack> or L<Net::AMQP::Protocol::Channel::Close>.
1328
1329=item on_return
1330
1331In AMQP, a "returned" message is one that cannot be delivered in compliance with the
1332C<immediate> or C<mandatory> flags.
1333
1334If in confirm mode, this callback will be called with the frame that reports message
1335return, typically L<Net::AMQP::Protocol::Basic::Return>.  If confirm mode is off or
1336this callback is not provided, then the channel or connection objects' on_return
1337callbacks (if any), will be called instead.
1338
1339NOTE: If confirm mode is on, the on_ack or on_nack callback will be called whether or
1340not on_return is called first.
1341
1342=back
1343
1344=head2 cancel
1345
1346Cancel a queue subscription.
1347
1348Note that the cancellation B<will not> take place at once, and further messages may be
1349consumed before the subscription is cancelled. No further messages will be
1350consumed after the on_success callback has been called.
1351
1352Arguments:
1353
1354=over
1355
1356=item consumer_tag
1357
1358Identifies this consumer, needs to be the value supplied when the queue is initially
1359consumed from.
1360
1361=item on_success
1362
1363Callback called if the subscription was successfully cancelled.
1364
1365=item on_failure
1366
1367Callback called if the subscription could not be cancelled for any reason.
1368
1369=back
1370
1371=head2 get
1372
1373Try to get a single message from a queue.
1374
1375Arguments:
1376
1377=over
1378
1379=item queue
1380
1381Mandatory. Name of the queue to try to receive a message from.
1382
1383=item on_success
1384
1385Will be called either with either a message, or, if the queue is empty,
1386a notification that there was nothing to collect from the queue.
1387
1388=item on_failure
1389
1390This callback will be called if an error is signalled on this channel.
1391
1392=item no_ack
1393
13940 or 1, default 1
1395
1396=back
1397
1398=head2 ack
1399
1400=head2 qos
1401
1402=head2 confirm
1403
1404Put channel into confirm mode.  In confirm mode, publishes are confirmed by
1405the server, so the on_ack callback of publish works.
1406
1407=head2 recover
1408
1409=head2 select_tx
1410
1411=head2 commit_tx
1412
1413=head2 rollback_tx
1414
1415=head1 AUTHOR, COPYRIGHT AND LICENSE
1416
1417See L<AnyEvent::RabbitMQ> for author(s), copyright and license.
1418
1419=cut
1420