1package AnyEvent::RabbitMQ::Channel; 2 3use strict; 4use warnings; 5 6use AnyEvent::RabbitMQ::LocalQueue; 7use AnyEvent; 8use Scalar::Util qw( looks_like_number weaken ); 9use Devel::GlobalDestruction; 10use Carp qw(croak cluck); 11use POSIX qw(ceil); 12BEGIN { *Dumper = \&AnyEvent::RabbitMQ::Dumper } 13 14our $VERSION = '1.22'; # VERSION 15 16use namespace::clean; 17 18use constant { 19 _ST_CLOSED => 0, 20 _ST_OPENING => 1, 21 _ST_OPEN => 2, 22}; 23 24sub new { 25 my $class = shift; 26 27 my $self = bless { 28 on_close => sub {}, 29 @_, # id, connection, on_return, on_close, on_inactive, on_active 30 _queue => AnyEvent::RabbitMQ::LocalQueue->new, 31 _content_queue => AnyEvent::RabbitMQ::LocalQueue->new, 32 }, $class; 33 weaken($self->{connection}); 34 return $self->_reset; 35} 36 37sub _reset { 38 my $self = shift; 39 40 my %a = ( 41 _state => _ST_CLOSED, 42 _is_active => 0, 43 _is_confirm => 0, 44 _publish_tag => 0, 45 _publish_cbs => {}, # values: [on_ack, on_nack, on_return] 46 _consumer_cbs => {}, # values: [on_consume, on_cancel...] 47 ); 48 @$self{keys %a} = values %a; 49 50 return $self; 51} 52 53sub id { 54 my $self = shift; 55 return $self->{id}; 56} 57 58sub is_open { 59 my $self = shift; 60 return $self->{_state} == _ST_OPEN; 61} 62 63sub is_active { 64 my $self = shift; 65 return $self->{_is_active}; 66} 67 68sub is_confirm { 69 my $self = shift; 70 return $self->{_is_confirm}; 71} 72 73sub queue { 74 my $self = shift; 75 return $self->{_queue}; 76} 77 78sub open { 79 my $self = shift; 80 my %args = @_; 81 82 if ($self->{_state} != _ST_CLOSED) { 83 $args{on_failure}->('Channel has already been opened'); 84 return $self; 85 } 86 87 $self->{_state} = _ST_OPENING; 88 89 $self->{connection}->_push_write_and_read( 90 'Channel::Open', {}, 'Channel::OpenOk', 91 sub { 92 $self->{_state} = _ST_OPEN; 93 $self->{_is_active} = 1; 94 $args{on_success}->($self); 95 }, 96 sub { 97 $self->{_state} = _ST_CLOSED; 98 $args{on_failure}->($self); 99 }, 100 $self->{id}, 101 ); 102 103 return $self; 104} 105 106sub close { 107 my $self = shift; 108 my $connection = $self->{connection} 109 or return; 110 my %args = $connection->_set_cbs(@_); 111 112 # If open in in progess, wait for it; 1s arbitrary timing. 113 114 weaken(my $wself = $self); 115 my $t; $t = AE::timer 0, 1, sub { 116 (my $self = $wself) or undef $t, return; 117 return if $self->{_state} == _ST_OPENING; 118 119 # No more tests are required 120 undef $t; 121 122 # Double close is OK 123 if ($self->{_state} == _ST_CLOSED) { 124 $args{on_success}->($self); 125 return; 126 } 127 128 $connection->_push_write( 129 $self->_close_frame, 130 $self->{id}, 131 ); 132 133 # The spec says that after a party sends Channel::Close, it MUST 134 # discard all frames for that channel. So this channel is dead 135 # immediately. 136 $self->_closed(); 137 138 $connection->_push_read_and_valid( 139 'Channel::CloseOk', 140 sub { 141 $args{on_success}->($self); 142 $self->_orphan(); 143 }, 144 sub { 145 $args{on_failure}->(@_); 146 $self->_orphan(); 147 }, 148 $self->{id}, 149 ); 150 }; 151 152 return $self; 153} 154 155sub _closed { 156 my $self = shift; 157 my ($frame,) = @_; 158 $frame ||= $self->_close_frame(); 159 160 return if $self->{_state} == _ST_CLOSED; 161 $self->{_state} = _ST_CLOSED; 162 163 # Perform callbacks for all outstanding commands 164 $self->{_queue}->_flush($frame); 165 $self->{_content_queue}->_flush($frame); 166 167 # Fake nacks of all outstanding publishes 168 $_->($frame) for grep { defined } map { $_->[1] } values %{ $self->{_publish_cbs} }; 169 170 # Report cancelation of all outstanding consumes 171 my @tags = keys %{ $self->{_consumer_cbs} }; 172 $self->_canceled($_, $frame) for @tags; 173 174 # Report close to on_close callback 175 { local $@; 176 eval { $self->{on_close}->($frame) }; 177 warn "Error in channel on_close callback, ignored:\n $@ " if $@; } 178 179 # Reset state (partly redundant) 180 $self->_reset; 181 182 return $self; 183} 184 185sub _close_frame { 186 my $self = shift; 187 my ($text,) = @_; 188 189 Net::AMQP::Frame::Method->new( 190 method_frame => Net::AMQP::Protocol::Channel::Close->new( 191 reply_text => $text, 192 ), 193 ); 194} 195 196sub _orphan { 197 my $self = shift; 198 199 if (my $connection = $self->{connection}) { 200 $connection->_delete_channel($self); 201 } 202 return $self; 203} 204 205sub declare_exchange { 206 my $self = shift; 207 my ($cb, $failure_cb, %args) = $self->_delete_cbs(@_); 208 209 return $self if !$self->_check_open($failure_cb); 210 211 $self->{connection}->_push_write_and_read( 212 'Exchange::Declare', 213 { 214 type => 'direct', 215 passive => 0, 216 durable => 0, 217 auto_delete => 0, 218 internal => 0, 219 %args, # exchange 220 ticket => 0, 221 nowait => 0, # FIXME 222 }, 223 'Exchange::DeclareOk', 224 $cb, 225 $failure_cb, 226 $self->{id}, 227 ); 228 229 return $self; 230} 231 232sub bind_exchange { 233 my $self = shift; 234 my ($cb, $failure_cb, %args) = $self->_delete_cbs(@_); 235 236 return $self if !$self->_check_open($failure_cb); 237 238 $self->{connection}->_push_write_and_read( 239 'Exchange::Bind', 240 { 241 %args, # source, destination, routing_key 242 ticket => 0, 243 nowait => 0, # FIXME 244 }, 245 'Exchange::BindOk', 246 $cb, 247 $failure_cb, 248 $self->{id}, 249 ); 250 251 return $self; 252} 253 254sub unbind_exchange { 255 my $self = shift; 256 my ($cb, $failure_cb, %args) = $self->_delete_cbs(@_); 257 258 return $self if !$self->_check_open($failure_cb); 259 260 $self->{connection}->_push_write_and_read( 261 'Exchange::Unbind', 262 { 263 %args, # source, destination, routing_key 264 ticket => 0, 265 nowait => 0, # FIXME 266 }, 267 'Exchange::UnbindOk', 268 $cb, 269 $failure_cb, 270 $self->{id}, 271 ); 272 273 return $self; 274} 275 276sub delete_exchange { 277 my $self = shift; 278 my ($cb, $failure_cb, %args) = $self->_delete_cbs(@_); 279 280 return $self if !$self->_check_open($failure_cb); 281 282 $self->{connection}->_push_write_and_read( 283 'Exchange::Delete', 284 { 285 if_unused => 0, 286 %args, # exchange 287 ticket => 0, 288 nowait => 0, # FIXME 289 }, 290 'Exchange::DeleteOk', 291 $cb, 292 $failure_cb, 293 $self->{id}, 294 ); 295 296 return $self; 297} 298 299sub declare_queue { 300 my $self = shift; 301 my ($cb, $failure_cb, %args) = $self->_delete_cbs(@_); 302 303 return $self if !$self->_check_open($failure_cb); 304 305 $self->{connection}->_push_write_and_read( 306 'Queue::Declare', 307 { 308 queue => '', 309 passive => 0, 310 durable => 0, 311 exclusive => 0, 312 auto_delete => 0, 313 no_ack => 1, 314 %args, 315 ticket => 0, 316 nowait => 0, # FIXME 317 }, 318 'Queue::DeclareOk', 319 $cb, 320 $failure_cb, 321 $self->{id}, 322 ); 323} 324 325sub bind_queue { 326 my $self = shift; 327 my ($cb, $failure_cb, %args) = $self->_delete_cbs(@_); 328 329 return $self if !$self->_check_open($failure_cb); 330 331 $self->{connection}->_push_write_and_read( 332 'Queue::Bind', 333 { 334 %args, # queue, exchange, routing_key 335 ticket => 0, 336 nowait => 0, # FIXME 337 }, 338 'Queue::BindOk', 339 $cb, 340 $failure_cb, 341 $self->{id}, 342 ); 343 344 return $self; 345} 346 347sub unbind_queue { 348 my $self = shift; 349 my ($cb, $failure_cb, %args) = $self->_delete_cbs(@_); 350 351 return $self if !$self->_check_open($failure_cb); 352 353 $self->{connection}->_push_write_and_read( 354 'Queue::Unbind', 355 { 356 %args, # queue, exchange, routing_key 357 ticket => 0, 358 }, 359 'Queue::UnbindOk', 360 $cb, 361 $failure_cb, 362 $self->{id}, 363 ); 364 365 return $self; 366} 367 368sub purge_queue { 369 my $self = shift; 370 my ($cb, $failure_cb, %args) = $self->_delete_cbs(@_); 371 372 return $self if !$self->_check_open($failure_cb); 373 374 $self->{connection}->_push_write_and_read( 375 'Queue::Purge', 376 { 377 %args, # queue 378 ticket => 0, 379 nowait => 0, # FIXME 380 }, 381 'Queue::PurgeOk', 382 $cb, 383 $failure_cb, 384 $self->{id}, 385 ); 386 387 return $self; 388} 389 390sub delete_queue { 391 my $self = shift; 392 my ($cb, $failure_cb, %args) = $self->_delete_cbs(@_); 393 394 return $self if !$self->_check_open($failure_cb); 395 396 $self->{connection}->_push_write_and_read( 397 'Queue::Delete', 398 { 399 if_unused => 0, 400 if_empty => 0, 401 %args, # queue 402 ticket => 0, 403 nowait => 0, # FIXME 404 }, 405 'Queue::DeleteOk', 406 $cb, 407 $failure_cb, 408 $self->{id}, 409 ); 410 411 return $self; 412} 413 414sub publish { 415 my $self = shift; 416 my %args = @_; 417 418 # Docs should advise channel-level callback over this, but still, better to give user an out 419 unless ($self->{_is_active}) { 420 if (defined $args{on_inactive}) { 421 $args{on_inactive}->(); 422 return $self; 423 } 424 croak "Can't publish on inactive channel (server flow control); provide on_inactive callback"; 425 } 426 427 my $header_args = delete $args{header}; 428 my $body = delete $args{body}; 429 my $ack_cb = delete $args{on_ack}; 430 my $nack_cb = delete $args{on_nack}; 431 my $return_cb = delete $args{on_return}; 432 433 defined($header_args) or $header_args = {}; 434 defined($body) or $body = ''; 435 if ( defined($ack_cb) or defined($nack_cb) or defined($return_cb) ) { 436 cluck "Can't set on_ack/on_nack/on_return callback when not in confirm mode" 437 unless $self->{_is_confirm}; 438 } 439 440 my $tag; 441 if ($self->{_is_confirm}) { 442 # yeah, delivery tags in acks are sequential. see Java client 443 $tag = ++$self->{_publish_tag}; 444 if ($return_cb) { 445 $header_args = { %$header_args }; 446 $header_args->{headers}->{_ar_return} = $tag; # just reuse the same value, why not 447 } 448 $self->{_publish_cbs}->{$tag} = [$ack_cb, $nack_cb, $return_cb]; 449 } 450 451 $self->_publish( 452 %args, 453 )->_header( 454 $header_args, $body, 455 )->_body( 456 $body, 457 ); 458 459 return $self; 460} 461 462sub _publish { 463 my $self = shift; 464 my %args = @_; 465 466 $self->{connection}->_push_write( 467 Net::AMQP::Protocol::Basic::Publish->new( 468 exchange => '', 469 mandatory => 0, 470 immediate => 0, 471 %args, # routing_key 472 ticket => 0, 473 ), 474 $self->{id}, 475 ); 476 477 return $self; 478} 479 480sub _header { 481 my ($self, $args, $body) = @_; 482 483 my $weight = delete $args->{weight} || 0; 484 485 # user-provided message headers must be strings. protect values that look like numbers. 486 my $headers = $args->{headers} || {}; 487 my @prot = grep { my $v = $headers->{$_}; !ref($v) && looks_like_number($v) } keys %$headers; 488 if (@prot) { 489 $headers = { 490 %$headers, 491 map { $_ => Net::AMQP::Value::String->new($headers->{$_}) } @prot 492 }; 493 } 494 495 $self->{connection}->_push_write( 496 Net::AMQP::Frame::Header->new( 497 weight => $weight, 498 body_size => length($body), 499 header_frame => Net::AMQP::Protocol::Basic::ContentHeader->new( 500 content_type => 'application/octet-stream', 501 content_encoding => undef, 502 delivery_mode => 1, 503 priority => 1, 504 correlation_id => undef, 505 expiration => undef, 506 message_id => undef, 507 timestamp => time, 508 type => undef, 509 user_id => $self->{connection}->login_user, 510 app_id => undef, 511 cluster_id => undef, 512 %$args, 513 headers => $headers, 514 ), 515 ), 516 $self->{id}, 517 ); 518 519 return $self; 520} 521 522sub _body { 523 my ($self, $body,) = @_; 524 525 my $body_max = $self->{connection}->{_body_max} || length $body; 526 527 # chunk up body into segments measured by $frame_max 528 while (length $body) { 529 $self->{connection}->_push_write( 530 Net::AMQP::Frame::Body->new( 531 payload => substr($body, 0, $body_max, '')), 532 $self->{id} 533 ); 534 } 535 536 return $self; 537} 538 539sub consume { 540 my $self = shift; 541 my ($cb, $failure_cb, %args) = $self->_delete_cbs(@_); 542 543 return $self if !$self->_check_open($failure_cb); 544 545 my $consumer_cb = delete $args{on_consume} || sub {}; 546 my $cancel_cb = delete $args{on_cancel} || sub {}; 547 my $no_ack = delete $args{no_ack} // 1; 548 549 $self->{connection}->_push_write_and_read( 550 'Basic::Consume', 551 { 552 consumer_tag => '', 553 no_local => 0, 554 no_ack => $no_ack, 555 exclusive => 0, 556 557 %args, # queue 558 ticket => 0, 559 nowait => 0, # FIXME 560 }, 561 'Basic::ConsumeOk', 562 sub { 563 my $frame = shift; 564 my $tag = $frame->method_frame->consumer_tag; 565 $self->{_consumer_cbs}->{$tag} = [ $consumer_cb, $cancel_cb ]; 566 $cb->($frame); 567 }, 568 $failure_cb, 569 $self->{id}, 570 ); 571 572 return $self; 573} 574 575sub cancel { 576 my $self = shift; 577 my ($cb, $failure_cb, %args) = $self->_delete_cbs(@_); 578 579 return $self if !$self->_check_open($failure_cb); 580 581 if (!defined $args{consumer_tag}) { 582 $failure_cb->('consumer_tag is not set'); 583 return $self; 584 } 585 586 my $cons_cbs = $self->{_consumer_cbs}->{$args{consumer_tag}}; 587 unless ($cons_cbs) { 588 $failure_cb->('Unknown consumer_tag'); 589 return $self; 590 } 591 push @$cons_cbs, $cb; 592 593 $self->{connection}->_push_write( 594 Net::AMQP::Protocol::Basic::Cancel->new( 595 %args, # consumer_tag 596 nowait => 0, 597 ), 598 $self->{id}, 599 ); 600 601 return $self; 602} 603 604sub _canceled { 605 my $self = shift; 606 my ($tag, $frame,) = @_; 607 608 my $cons_cbs = delete $self->{_consumer_cbs}->{$tag} 609 or return 0; 610 611 shift @$cons_cbs; # no more deliveries 612 for my $cb (reverse @$cons_cbs) { 613 $cb->($frame); 614 } 615 return 1; 616} 617 618sub get { 619 my $self = shift; 620 my ($cb, $failure_cb, %args) = $self->_delete_cbs(@_); 621 622 my $no_ack = delete $args{no_ack} // 1; 623 624 return $self if !$self->_check_open($failure_cb); 625 626 $self->{connection}->_push_write_and_read( 627 'Basic::Get', 628 { 629 no_ack => $no_ack, 630 %args, # queue 631 ticket => 0, 632 }, 633 [qw(Basic::GetOk Basic::GetEmpty)], 634 sub { 635 my $frame = shift; 636 return $cb->({empty => $frame}) 637 if $frame->method_frame->isa('Net::AMQP::Protocol::Basic::GetEmpty'); 638 $self->_push_read_header_and_body('ok', $frame, $cb, $failure_cb); 639 }, 640 $failure_cb, 641 $self->{id}, 642 ); 643 644 return $self; 645} 646 647sub ack { 648 my $self = shift; 649 my %args = @_; 650 651 return $self if !$self->_check_open(sub {}); 652 653 $self->{connection}->_push_write( 654 Net::AMQP::Protocol::Basic::Ack->new( 655 delivery_tag => 0, 656 multiple => ( 657 defined $args{delivery_tag} && $args{delivery_tag} != 0 ? 0 : 1 658 ), 659 %args, 660 ), 661 $self->{id}, 662 ); 663 664 return $self; 665} 666 667sub qos { 668 my $self = shift; 669 my ($cb, $failure_cb, %args) = $self->_delete_cbs(@_); 670 671 return $self if !$self->_check_open($failure_cb); 672 673 $self->{connection}->_push_write_and_read( 674 'Basic::Qos', 675 { 676 prefetch_count => 1, 677 prefetch_size => 0, 678 global => 0, 679 %args, 680 }, 681 'Basic::QosOk', 682 $cb, 683 $failure_cb, 684 $self->{id}, 685 ); 686 687 return $self; 688} 689 690sub confirm { 691 my $self = shift; 692 my ($cb, $failure_cb, %args) = $self->_delete_cbs(@_); 693 694 return $self if !$self->_check_open($failure_cb); 695 return $self if !$self->_check_version(0, 9, $failure_cb); 696 697 weaken(my $wself = $self); 698 699 $self->{connection}->_push_write_and_read( 700 'Confirm::Select', 701 { 702 %args, 703 nowait => 0, # FIXME 704 }, 705 'Confirm::SelectOk', 706 sub { 707 my $me = $wself or return; 708 $me->{_is_confirm} = 1; 709 $cb->(); 710 }, 711 $failure_cb, 712 $self->{id}, 713 ); 714 715 return $self; 716} 717 718sub recover { 719 my $self = shift; 720 my ($cb, $failure_cb, %args) = $self->_delete_cbs(@_); 721 722 return $self if !$self->_check_open(sub {}); 723 724 $self->{connection}->_push_write( 725 Net::AMQP::Protocol::Basic::Recover->new( 726 requeue => 1, 727 %args, 728 ), 729 $self->{id}, 730 ); 731 732 if (!$args{nowait} && $self->_check_version(0, 9)) { 733 $self->{connection}->_push_read_and_valid( 734 'Basic::RecoverOk', 735 $cb, 736 $failure_cb, 737 $self->{id}, 738 ); 739 } 740 else { 741 $cb->(); 742 } 743 744 return $self; 745} 746 747sub reject { 748 my $self = shift; 749 my %args = @_; 750 751 return $self if !$self->_check_open( sub { } ); 752 753 $self->{connection}->_push_write( 754 Net::AMQP::Protocol::Basic::Reject->new( 755 delivery_tag => 0, 756 requeue => 0, 757 %args, 758 ), 759 $self->{id}, 760 ); 761 762 return $self; 763} 764 765sub select_tx { 766 my $self = shift; 767 my ($cb, $failure_cb,) = $self->_delete_cbs(@_); 768 769 return $self if !$self->_check_open($failure_cb); 770 771 $self->{connection}->_push_write_and_read( 772 'Tx::Select', {}, 'Tx::SelectOk', 773 $cb, 774 $failure_cb, 775 $self->{id}, 776 ); 777 778 return $self; 779} 780 781sub commit_tx { 782 my $self = shift; 783 my ($cb, $failure_cb,) = $self->_delete_cbs(@_); 784 785 return $self if !$self->_check_open($failure_cb); 786 787 $self->{connection}->_push_write_and_read( 788 'Tx::Commit', {}, 'Tx::CommitOk', 789 $cb, 790 $failure_cb, 791 $self->{id}, 792 ); 793 794 return $self; 795} 796 797sub rollback_tx { 798 my $self = shift; 799 my ($cb, $failure_cb,) = $self->_delete_cbs(@_); 800 801 return $self if !$self->_check_open($failure_cb); 802 803 $self->{connection}->_push_write_and_read( 804 'Tx::Rollback', {}, 'Tx::RollbackOk', 805 $cb, 806 $failure_cb, 807 $self->{id}, 808 ); 809 810 return $self; 811} 812 813sub push_queue_or_consume { 814 my $self = shift; 815 my ($frame, $failure_cb,) = @_; 816 817 # Note: the spec says that after a party sends Channel::Close, it MUST 818 # discard all frames for that channel other than Close and CloseOk. 819 820 if ($frame->isa('Net::AMQP::Frame::Method')) { 821 my $method_frame = $frame->method_frame; 822 if ($method_frame->isa('Net::AMQP::Protocol::Channel::Close')) { 823 $self->{connection}->_push_write( 824 Net::AMQP::Protocol::Channel::CloseOk->new(), 825 $self->{id}, 826 ); 827 $self->_closed($frame); 828 $self->_orphan(); 829 return $self; 830 } elsif ($self->{_state} != _ST_OPEN) { 831 if ($method_frame->isa('Net::AMQP::Protocol::Channel::OpenOk') || 832 $method_frame->isa('Net::AMQP::Protocol::Channel::CloseOk')) { 833 $self->{_queue}->push($frame); 834 } 835 return $self; 836 } elsif ($method_frame->isa('Net::AMQP::Protocol::Basic::Deliver')) { 837 my $cons_cbs = $self->{_consumer_cbs}->{$method_frame->consumer_tag}; 838 my $cb = ($cons_cbs && $cons_cbs->[0]) || sub {}; 839 $self->_push_read_header_and_body('deliver', $frame, $cb, $failure_cb); 840 return $self; 841 } elsif ($method_frame->isa('Net::AMQP::Protocol::Basic::CancelOk') || 842 $method_frame->isa('Net::AMQP::Protocol::Basic::Cancel')) { 843 # CancelOk means we asked for a cancel. 844 # Cancel means queue was deleted; it is not AMQP, but RMQ supports it. 845 if (!$self->_canceled($method_frame->consumer_tag, $frame) 846 && $method_frame->isa('Net::AMQP::Protocol::Basic::CancelOk')) { 847 $failure_cb->("Received CancelOk for unknown consumer tag " . $method_frame->consumer_tag); 848 } 849 return $self; 850 } elsif ($method_frame->isa('Net::AMQP::Protocol::Basic::Return')) { 851 weaken(my $wself = $self); 852 my $cb = sub { 853 my $ret = shift; 854 my $me = $wself or return; 855 my $headers = $ret->{header}->headers || {}; 856 my $onret_cb; 857 if (defined(my $tag = $headers->{_ar_return})) { 858 my $cbs = $me->{_publish_cbs}->{$tag}; 859 $onret_cb = $cbs->[2] if $cbs; 860 } 861 $onret_cb ||= $me->{on_return} || $me->{connection}->{on_return} || sub {}; # oh well 862 $onret_cb->($frame); 863 }; 864 $self->_push_read_header_and_body('return', $frame, $cb, $failure_cb); 865 return $self; 866 } elsif ($method_frame->isa('Net::AMQP::Protocol::Basic::Ack') || 867 $method_frame->isa('Net::AMQP::Protocol::Basic::Nack')) { 868 (my $resp = ref($method_frame)) =~ s/.*:://; 869 my $cbs; 870 if (!$self->{_is_confirm}) { 871 $failure_cb->("Received $resp when not in confirm mode"); 872 } 873 else { 874 my @tags; 875 if ($method_frame->{multiple}) { 876 @tags = sort { $a <=> $b } 877 grep { $_ <= $method_frame->{delivery_tag} } 878 keys %{$self->{_publish_cbs}}; 879 } 880 else { 881 @tags = ($method_frame->{delivery_tag}); 882 } 883 my $cbi = ($resp eq 'Ack') ? 0 : 1; 884 for my $tag (@tags) { 885 my $cbs; 886 if (not $cbs = delete $self->{_publish_cbs}->{$tag}) { 887 $failure_cb->("Received $resp of unknown delivery tag $tag"); 888 } 889 elsif ($cbs->[$cbi]) { 890 $cbs->[$cbi]->($frame); 891 } 892 } 893 } 894 return $self; 895 } elsif ($method_frame->isa('Net::AMQP::Protocol::Channel::Flow')) { 896 $self->{_is_active} = $method_frame->active; 897 $self->{connection}->_push_write( 898 Net::AMQP::Protocol::Channel::FlowOk->new( 899 active => $method_frame->active, 900 ), 901 $self->{id}, 902 ); 903 my $cbname = $self->{_is_active} ? 'on_active' : 'on_inactive'; 904 my $cb = $self->{$cbname} || $self->{connection}->{$cbname} || sub {}; 905 $cb->($frame); 906 return $self; 907 } 908 $self->{_queue}->push($frame); 909 } else { 910 $self->{_content_queue}->push($frame); 911 } 912 913 return $self; 914} 915 916sub _push_read_header_and_body { 917 my $self = shift; 918 my ($type, $frame, $cb, $failure_cb,) = @_; 919 my $response = {$type => $frame}; 920 my $body_size = 0; 921 my $body_payload = ""; 922 923 weaken(my $wcontq = $self->{_content_queue}); 924 my $w_body_frame; 925 my $body_frame = sub { 926 my $frame = shift; 927 928 return $failure_cb->('Received data is not body frame') 929 if !$frame->isa('Net::AMQP::Frame::Body'); 930 931 $body_payload .= $frame->payload; 932 933 if (length($body_payload) < $body_size) { 934 # More to come 935 my $contq = $wcontq or return; 936 $contq->get($w_body_frame); 937 } 938 else { 939 $frame->payload($body_payload); 940 $response->{body} = $frame; 941 $cb->($response); 942 } 943 }; 944 $w_body_frame = $body_frame; 945 weaken($w_body_frame); 946 947 $self->{_content_queue}->get(sub{ 948 my $frame = shift; 949 950 return $failure_cb->('Received data is not header frame') 951 if !$frame->isa('Net::AMQP::Frame::Header'); 952 953 my $header_frame = $frame->header_frame; 954 return $failure_cb->( 955 'Header is not Protocol::Basic::ContentHeader' 956 . 'Header was ' . ref $header_frame 957 ) if !$header_frame->isa('Net::AMQP::Protocol::Basic::ContentHeader'); 958 959 $response->{header} = $header_frame; 960 961 $body_size = $frame->body_size; 962 if ( $body_size ) { 963 my $contq = $wcontq or return; 964 $contq->get($body_frame); 965 } else { 966 $response->{body} = undef; 967 $cb->($response); 968 } 969 }); 970 971 return $self; 972} 973 974sub _delete_cbs { 975 my $self = shift; 976 my %args = @_; 977 978 my $cb = delete $args{on_success} || sub {}; 979 my $failure_cb = delete $args{on_failure} || sub {die @_}; 980 981 return $cb, $failure_cb, %args; 982} 983 984sub _check_open { 985 my $self = shift; 986 my ($failure_cb) = @_; 987 988 return 1 if $self->is_open(); 989 990 $failure_cb->('Channel has already been closed'); 991 return 0; 992} 993 994sub _check_version { 995 my $self = shift; 996 my ($major, $minor, $failure_cb) = @_; 997 998 my $amaj = $Net::AMQP::Protocol::VERSION_MAJOR; 999 my $amin = $Net::AMQP::Protocol::VERSION_MINOR; 1000 1001 return 1 if $amaj >= $major || $amaj == $major && $amin >= $minor; 1002 1003 $failure_cb->("Not supported in AMQP $amaj-$amin") if $failure_cb; 1004 return 0; 1005} 1006 1007sub DESTROY { 1008 my $self = shift; 1009 $self->close() if !in_global_destruction && $self->is_open(); 1010 return; 1011} 1012 10131; 1014__END__ 1015 1016=head1 NAME 1017 1018AnyEvent::RabbitMQ::Channel - Abstraction of an AMQP channel. 1019 1020=head1 SYNOPSIS 1021 1022 my $ch = $rf->open_channel(); 1023 $ch->declare_exchange(exchange => 'test_exchange'); 1024 1025=head1 DESCRIPTION 1026 1027A RabbitMQ channel. 1028 1029A channel is a light-weight virtual connection within a TCP connection to a 1030RabbitMQ broker. 1031 1032=head1 ARGUMENTS FOR C<open_channel> 1033 1034=over 1035 1036=item on_close 1037 1038Callback invoked when the channel closes. Callback will be passed the 1039incoming message that caused the close, if any. 1040 1041=item on_return 1042 1043Callback invoked when a mandatory or immediate message publish fails. 1044Callback will be passed the incoming message, with accessors 1045C<method_frame>, C<header_frame>, and C<body_frame>. 1046 1047=back 1048 1049=head1 METHODS 1050 1051=head2 declare_exchange (%args) 1052 1053Declare an exchange (to publish messages to) on the server. 1054 1055Arguments: 1056 1057=over 1058 1059=item on_success 1060 1061=item on_failure 1062 1063=item type 1064 1065Default 'direct' 1066 1067=item passive 1068 1069Default 0 1070 1071=item durable 1072 1073Default 0 1074 1075=item auto_delete 1076 1077Default 0 1078 1079=item internal 1080 1081Default 0 1082 1083=item exchange 1084 1085The name of the exchange 1086 1087=back 1088 1089=head2 bind_exchange 1090 1091Binds an exchange to another exchange, with a routing key. 1092 1093Arguments: 1094 1095=over 1096 1097=item source 1098 1099The name of the source exchange to bind 1100 1101=item destination 1102 1103The name of the destination exchange to bind 1104 1105=item routing_key 1106 1107The routing key to bind with 1108 1109=back 1110 1111=head2 unbind_exchange 1112 1113=head2 delete_exchange 1114 1115=head2 declare_queue 1116 1117Declare a queue (create it if it doesn't exist yet) for publishing messages 1118to on the server. 1119 1120 my $done = AnyEvent->condvar; 1121 $channel->declare_queue( 1122 exchange => $queue_exchange, 1123 queue => $queueName, 1124 durable => 0, 1125 auto_delete => 1, 1126 passive => 0, 1127 arguments => { 'x-expires' => 0, }, 1128 on_success => sub { $done->send; }, 1129 on_failure => sub { 1130 say "Unable to create queue $queueName"; 1131 $done->send; 1132 }, 1133 ); 1134 $done->recv; 1135 1136Arguments: 1137 1138=over 1139 1140=item queue 1141 1142Name of the queue to be declared. If the queue name is the empty string, 1143RabbitMQ will create a unique name for the queue. This is useful for 1144temporary/private reply queues. 1145 1146=item on_success 1147 1148Callback that is called when the queue was declared successfully. The argument 1149to the callback is of type L<Net::AMQP::Frame::Method>. To get the name of the 1150Queue (if you declared it with an empty name), you can say 1151 1152 on_success => sub { 1153 my $method = shift; 1154 my $name = $method->method_frame->queue; 1155 }; 1156 1157=item on_failure 1158 1159Callback that is called when the declaration of the queue has failed. 1160 1161=item auto_delete 1162 11630 or 1, default 0 1164 1165=item passive 1166 11670 or 1, default 0 1168 1169=item durable 1170 11710 or 1, default 0 1172 1173=item exclusive 1174 11750 or 1, default 0 1176 1177=item no_ack 1178 11790 or 1, default 1 1180 1181=item ticket 1182 1183default 0 1184 1185=for comment 1186XXX Is "exchange" a valid parameter? 1187 1188=item arguments 1189 1190C<arguments> is a hashref of additional parameters which RabbitMQ extensions 1191may use. This list is not complete and your RabbitMQ server configuration will 1192determine which arguments are valid and how they act. 1193 1194=over 1195 1196=item x-expires 1197 1198The queue will automatically be removed after being idle for this many milliseconds. 1199 1200Default of 0 disables automatic queue removal. 1201 1202=back 1203 1204=back 1205 1206=head2 bind_queue 1207 1208Binds a queue to an exchange, with a routing key. 1209 1210Arguments: 1211 1212=over 1213 1214=item queue 1215 1216The name of the queue to bind 1217 1218=item exchange 1219 1220The name of the exchange to bind 1221 1222=item routing_key 1223 1224The routing key to bind with 1225 1226=back 1227 1228=head2 unbind_queue 1229 1230=head2 purge_queue 1231 1232Flushes the contents of a queue. 1233 1234=head2 delete_queue 1235 1236Deletes a queue. The queue may not have any active consumers. 1237 1238=head2 consume 1239 1240Subscribe to consume messages from a queue. 1241 1242Arguments: 1243 1244=over 1245 1246=item queue 1247 1248The name of the queue to be consumed from. 1249 1250=item on_consume 1251 1252Callback called with an argument of the message which has been consumed. 1253 1254The message is a hash reference, where the value to key C<header> is an object 1255of type L<Net::AMQP::Protocol::Basic::ContentHeader>, L<body> is a 1256L<Net::AMQP::Frame::Body>, and C<deliver> a L<Net::AMQP::Frame::Method>. 1257 1258=item on_cancel 1259 1260Callback called if consumption is cancelled. This may be at client request 1261or as a side effect of queue deletion. (Notification of queue deletion is a 1262RabbitMQ extension.) 1263 1264=item consumer_tag 1265 1266Identifies this consumer, will be auto-generated if you do not provide it, but you must 1267supply a value if you want to be able to later cancel the subscription. 1268 1269=item on_success 1270 1271Callback called if the subscription was successful (before the first message is consumed). 1272 1273=item on_failure 1274 1275Callback called if the subscription fails for any reason. 1276 1277=item no_ack 1278 1279Pass through the C<no_ack> flag. Defaults to C<1>. If set to C<1>, the server 1280will not expect messages to be acknowledged. 1281 1282=back 1283 1284=head2 publish 1285 1286Publish a message to an exchange. 1287 1288Arguments: 1289 1290=over 1291 1292=item exchange 1293 1294The name of the exchange to send the message to. 1295 1296=item routing_key 1297 1298The routing key with which to publish the message. 1299 1300=item header 1301 1302Hash of AMQP message header info, including the confusingly similar element "headers", 1303which may contain arbitrary string key/value pairs. 1304 1305=item body 1306 1307The text body of the message to send. 1308 1309=item mandatory 1310 1311Boolean; if true, then if the message doesn't land in a queue (e.g. the exchange has no 1312bindings), it will be "returned." (see "on_return") 1313 1314=item immediate 1315 1316Boolean; if true, then if the message cannot be delivered directly to a consumer, it 1317will be "returned." (see "on_return") 1318 1319=item on_ack 1320 1321Callback called with the frame that acknowledges receipt (if channel is in confirm mode), 1322typically L<Net::AMQP::Protocol::Basic::Ack>. 1323 1324=item on_nack 1325 1326Callback called with the frame that declines receipt (if the channel is in confirm mode), 1327typically L<Net::AMQP::Protocol::Basic::Nack> or L<Net::AMQP::Protocol::Channel::Close>. 1328 1329=item on_return 1330 1331In AMQP, a "returned" message is one that cannot be delivered in compliance with the 1332C<immediate> or C<mandatory> flags. 1333 1334If in confirm mode, this callback will be called with the frame that reports message 1335return, typically L<Net::AMQP::Protocol::Basic::Return>. If confirm mode is off or 1336this callback is not provided, then the channel or connection objects' on_return 1337callbacks (if any), will be called instead. 1338 1339NOTE: If confirm mode is on, the on_ack or on_nack callback will be called whether or 1340not on_return is called first. 1341 1342=back 1343 1344=head2 cancel 1345 1346Cancel a queue subscription. 1347 1348Note that the cancellation B<will not> take place at once, and further messages may be 1349consumed before the subscription is cancelled. No further messages will be 1350consumed after the on_success callback has been called. 1351 1352Arguments: 1353 1354=over 1355 1356=item consumer_tag 1357 1358Identifies this consumer, needs to be the value supplied when the queue is initially 1359consumed from. 1360 1361=item on_success 1362 1363Callback called if the subscription was successfully cancelled. 1364 1365=item on_failure 1366 1367Callback called if the subscription could not be cancelled for any reason. 1368 1369=back 1370 1371=head2 get 1372 1373Try to get a single message from a queue. 1374 1375Arguments: 1376 1377=over 1378 1379=item queue 1380 1381Mandatory. Name of the queue to try to receive a message from. 1382 1383=item on_success 1384 1385Will be called either with either a message, or, if the queue is empty, 1386a notification that there was nothing to collect from the queue. 1387 1388=item on_failure 1389 1390This callback will be called if an error is signalled on this channel. 1391 1392=item no_ack 1393 13940 or 1, default 1 1395 1396=back 1397 1398=head2 ack 1399 1400=head2 qos 1401 1402=head2 confirm 1403 1404Put channel into confirm mode. In confirm mode, publishes are confirmed by 1405the server, so the on_ack callback of publish works. 1406 1407=head2 recover 1408 1409=head2 select_tx 1410 1411=head2 commit_tx 1412 1413=head2 rollback_tx 1414 1415=head1 AUTHOR, COPYRIGHT AND LICENSE 1416 1417See L<AnyEvent::RabbitMQ> for author(s), copyright and license. 1418 1419=cut 1420