1"""Using Pika with a Twisted reactor. 2 3The interfaces in this module are Deferred-based when possible. This means that 4the connection.channel() method and most of the channel methods return 5Deferreds instead of taking a callback argument and that basic_consume() 6returns a Twisted DeferredQueue where messages from the server will be 7stored. Refer to the docstrings for TwistedProtocolConnection.channel() and the 8TwistedChannel class for details. 9 10""" 11 12import functools 13import logging 14from collections import namedtuple 15 16from twisted.internet import (defer, error as twisted_error, reactor, protocol) 17from twisted.python.failure import Failure 18 19import pika.connection 20from pika import exceptions, spec 21from pika.adapters.utils import nbio_interface 22from pika.adapters.utils.io_services_utils import check_callback_arg 23 24# Twistisms 25# pylint: disable=C0111,C0103 26# Other 27# pylint: disable=too-many-lines 28 29LOGGER = logging.getLogger(__name__) 30 31 32class ClosableDeferredQueue(defer.DeferredQueue): 33 """ 34 Like the normal Twisted DeferredQueue, but after close() is called with an 35 exception instance all pending Deferreds are errbacked and further attempts 36 to call get() or put() return a Failure wrapping that exception. 37 """ 38 39 def __init__(self, size=None, backlog=None): 40 self.closed = None 41 super(ClosableDeferredQueue, self).__init__(size, backlog) 42 43 def put(self, obj): 44 """ 45 Like the original :meth:`DeferredQueue.put` method, but returns an 46 errback if the queue is closed. 47 48 """ 49 if self.closed: 50 LOGGER.error('Impossible to put to the queue, it is closed.') 51 return defer.fail(self.closed) 52 return defer.DeferredQueue.put(self, obj) 53 54 def get(self): 55 """ 56 Returns a Deferred that will fire with the next item in the queue, when 57 it's available. 58 59 The Deferred will errback if the queue is closed. 60 61 :returns: Deferred that fires with the next item. 62 :rtype: Deferred 63 64 """ 65 if self.closed: 66 LOGGER.error('Impossible to get from the queue, it is closed.') 67 return defer.fail(self.closed) 68 return defer.DeferredQueue.get(self) 69 70 def close(self, reason): 71 """Closes the queue. 72 73 Errback the pending calls to :meth:`get()`. 74 75 """ 76 if self.closed: 77 LOGGER.warning('Queue was already closed with reason: %s.', 78 self.closed) 79 self.closed = reason 80 while self.waiting: 81 self.waiting.pop().errback(reason) 82 self.pending = [] 83 84 85ReceivedMessage = namedtuple("ReceivedMessage", 86 ["channel", "method", "properties", "body"]) 87 88 89class TwistedChannel(object): 90 """A wrapper around Pika's Channel. 91 92 Channel methods that normally take a callback argument are wrapped to 93 return a Deferred that fires with whatever would be passed to the callback. 94 If the channel gets closed, all pending Deferreds are errbacked with a 95 ChannelClosed exception. The returned Deferreds fire with whatever 96 arguments the callback to the original method would receive. 97 98 Some methods like basic_consume and basic_get are wrapped in a special way, 99 see their docstrings for details. 100 """ 101 102 def __init__(self, channel): 103 self._channel = channel 104 self._closed = None 105 self._calls = set() 106 self._consumers = {} 107 # Store Basic.Get calls so we can handle GetEmpty replies 108 self._basic_get_deferred = None 109 self._channel.add_callback(self._on_getempty, [spec.Basic.GetEmpty], 110 False) 111 # We need this mapping to close the ClosableDeferredQueue when a queue 112 # is deleted. 113 self._queue_name_to_consumer_tags = {} 114 # Whether RabbitMQ delivery confirmation has been enabled 115 self._delivery_confirmation = False 116 self._delivery_message_id = None 117 self._deliveries = {} 118 # Holds a ReceivedMessage object representing a message received via 119 # Basic.Return in publisher-acknowledgments mode. 120 self._puback_return = None 121 122 self.on_closed = defer.Deferred() 123 self._channel.add_on_close_callback(self._on_channel_closed) 124 self._channel.add_on_cancel_callback( 125 self._on_consumer_cancelled_by_broker) 126 127 def __repr__(self): 128 return '<{cls} channel={chan!r}>'.format( 129 cls=self.__class__.__name__, chan=self._channel) 130 131 def _on_channel_closed(self, _channel, reason): 132 # enter the closed state 133 self._closed = reason 134 # errback all pending calls 135 for d in self._calls: 136 d.errback(self._closed) 137 # close all open queues 138 for consumer in self._consumers.values(): 139 consumer.close(self._closed) 140 # release references to stored objects 141 self._calls = set() 142 self._consumers = {} 143 self.on_closed.callback(self._closed) 144 145 def _on_consumer_cancelled_by_broker(self, method_frame): 146 """Called by impl when broker cancels consumer via Basic.Cancel. 147 148 This is a RabbitMQ-specific feature. The circumstances include deletion 149 of queue being consumed as well as failure of a HA node responsible for 150 the queue being consumed. 151 152 :param pika.frame.Method method_frame: method frame with the 153 `spec.Basic.Cancel` method 154 155 """ 156 return self._on_consumer_cancelled(method_frame) 157 158 def _on_consumer_cancelled(self, frame): 159 """Called when the broker cancels a consumer via Basic.Cancel or when 160 the broker responds to a Basic.Cancel request by Basic.CancelOk. 161 162 :param pika.frame.Method frame: method frame with the 163 `spec.Basic.Cancel` or `spec.Basic.CancelOk` method 164 165 """ 166 consumer_tag = frame.method.consumer_tag 167 if consumer_tag not in self._consumers: 168 # Could be cancelled by user or broker earlier 169 LOGGER.warning('basic_cancel - consumer not found: %s', 170 consumer_tag) 171 return frame 172 self._consumers[consumer_tag].close(exceptions.ConsumerCancelled()) 173 del self._consumers[consumer_tag] 174 # Remove from the queue-to-ctags index: 175 for ctags in self._queue_name_to_consumer_tags.values(): 176 try: 177 ctags.remove(consumer_tag) 178 except KeyError: 179 continue 180 return frame 181 182 def _on_getempty(self, _method_frame): 183 """Callback the Basic.Get deferred with None. 184 """ 185 if self._basic_get_deferred is None: 186 LOGGER.warning("Got Basic.GetEmpty but no Basic.Get calls " 187 "were pending.") 188 return 189 self._basic_get_deferred.callback(None) 190 191 def _wrap_channel_method(self, name): 192 """Wrap Pika's Channel method to make it return a Deferred that fires 193 when the method completes and errbacks if the channel gets closed. If 194 the original method's callback would receive more than one argument, 195 the Deferred fires with a tuple of argument values. 196 197 """ 198 method = getattr(self._channel, name) 199 200 @functools.wraps(method) 201 def wrapped(*args, **kwargs): 202 if self._closed: 203 return defer.fail(self._closed) 204 205 d = defer.Deferred() 206 self._calls.add(d) 207 d.addCallback(self._clear_call, d) 208 209 def single_argument(*args): 210 """ 211 Make sure that the deferred is called with a single argument. 212 In case the original callback fires with more than one, convert 213 to a tuple. 214 """ 215 if len(args) > 1: 216 d.callback(tuple(args)) 217 else: 218 d.callback(*args) 219 220 kwargs['callback'] = single_argument 221 222 try: 223 method(*args, **kwargs) 224 except Exception: # pylint: disable=W0703 225 return defer.fail() 226 return d 227 228 return wrapped 229 230 def _clear_call(self, ret, d): 231 self._calls.discard(d) 232 return ret 233 234 # Public Channel attributes 235 236 @property 237 def channel_number(self): 238 return self._channel.channel_number 239 240 @property 241 def connection(self): 242 return self._channel.connection 243 244 @property 245 def is_closed(self): 246 """Returns True if the channel is closed. 247 248 :rtype: bool 249 250 """ 251 return self._channel.is_closed 252 253 @property 254 def is_closing(self): 255 """Returns True if client-initiated closing of the channel is in 256 progress. 257 258 :rtype: bool 259 260 """ 261 return self._channel.is_closing 262 263 @property 264 def is_open(self): 265 """Returns True if the channel is open. 266 267 :rtype: bool 268 269 """ 270 return self._channel.is_open 271 272 @property 273 def flow_active(self): 274 return self._channel.flow_active 275 276 @property 277 def consumer_tags(self): 278 return self._channel.consumer_tags 279 280 # Deferred-equivalents of public Channel methods 281 282 def callback_deferred(self, deferred, replies): 283 """Pass in a Deferred and a list replies from the RabbitMQ broker which 284 you'd like the Deferred to be callbacked on with the frame as callback 285 value. 286 287 :param Deferred deferred: The Deferred to callback 288 :param list replies: The replies to callback on 289 290 """ 291 self._channel.add_callback(deferred.callback, replies) 292 293 # Public Channel methods 294 295 def add_on_return_callback(self, callback): 296 """Pass a callback function that will be called when a published 297 message is rejected and returned by the server via `Basic.Return`. 298 299 :param callable callback: The method to call on callback with the 300 message as only argument. The message is a named tuple with 301 the following attributes: 302 channel: this TwistedChannel 303 method: pika.spec.Basic.Return 304 properties: pika.spec.BasicProperties 305 body: bytes 306 307 """ 308 self._channel.add_on_return_callback( 309 lambda _channel, method, properties, body: callback( 310 ReceivedMessage( 311 channel=self, 312 method=method, 313 properties=properties, 314 body=body, 315 ) 316 ) 317 ) 318 319 def basic_ack(self, delivery_tag=0, multiple=False): 320 """Acknowledge one or more messages. When sent by the client, this 321 method acknowledges one or more messages delivered via the Deliver or 322 Get-Ok methods. When sent by server, this method acknowledges one or 323 more messages published with the Publish method on a channel in 324 confirm mode. The acknowledgement can be for a single message or a 325 set of messages up to and including a specific message. 326 327 :param integer delivery_tag: int/long The server-assigned delivery tag 328 :param bool multiple: If set to True, the delivery tag is treated as 329 "up to and including", so that multiple messages 330 can be acknowledged with a single method. If set 331 to False, the delivery tag refers to a single 332 message. If the multiple field is 1, and the 333 delivery tag is zero, this indicates 334 acknowledgement of all outstanding messages. 335 336 """ 337 return self._channel.basic_ack( 338 delivery_tag=delivery_tag, multiple=multiple) 339 340 def basic_cancel(self, consumer_tag=''): 341 """This method cancels a consumer. This does not affect already 342 delivered messages, but it does mean the server will not send any more 343 messages for that consumer. The client may receive an arbitrary number 344 of messages in between sending the cancel method and receiving the 345 cancel-ok reply. It may also be sent from the server to the client in 346 the event of the consumer being unexpectedly cancelled (i.e. cancelled 347 for any reason other than the server receiving the corresponding 348 basic.cancel from the client). This allows clients to be notified of 349 the loss of consumers due to events such as queue deletion. 350 351 This method wraps :meth:`Channel.basic_cancel 352 <pika.channel.Channel.basic_cancel>` and closes any deferred queue 353 associated with that consumer. 354 355 :param str consumer_tag: Identifier for the consumer 356 :returns: Deferred that fires on the Basic.CancelOk response 357 :rtype: Deferred 358 :raises ValueError: 359 360 """ 361 wrapped = self._wrap_channel_method('basic_cancel') 362 d = wrapped(consumer_tag=consumer_tag) 363 return d.addCallback(self._on_consumer_cancelled) 364 365 def basic_consume(self, 366 queue, 367 auto_ack=False, 368 exclusive=False, 369 consumer_tag=None, 370 arguments=None): 371 """Consume from a server queue. 372 373 Sends the AMQP 0-9-1 command Basic.Consume to the broker and binds 374 messages for the consumer_tag to a 375 :class:`ClosableDeferredQueue`. If you do not pass in a 376 consumer_tag, one will be automatically generated for you. 377 378 For more information on basic_consume, see: 379 Tutorial 2 at http://www.rabbitmq.com/getstarted.html 380 http://www.rabbitmq.com/confirms.html 381 http://www.rabbitmq.com/amqp-0-9-1-reference.html#basic.consume 382 383 :param str queue: The queue to consume from. Use the empty string to 384 specify the most recent server-named queue for this channel. 385 :param bool auto_ack: if set to True, automatic acknowledgement mode 386 will be used (see http://www.rabbitmq.com/confirms.html). This 387 corresponds with the 'no_ack' parameter in the basic.consume AMQP 388 0.9.1 method 389 :param bool exclusive: Don't allow other consumers on the queue 390 :param str consumer_tag: Specify your own consumer tag 391 :param dict arguments: Custom key/value pair arguments for the consumer 392 :returns: Deferred that fires with a tuple 393 ``(queue_object, consumer_tag)``. The Deferred will errback with an 394 instance of :class:`exceptions.ChannelClosed` if the call fails. 395 The queue object is an instance of :class:`ClosableDeferredQueue`, 396 where data received from the queue will be stored. Clients should 397 use its :meth:`get() <ClosableDeferredQueue.get>` method to fetch 398 an individual message, which will return a Deferred firing with a 399 namedtuple whose attributes are: 400 - channel: this TwistedChannel 401 - method: pika.spec.Basic.Deliver 402 - properties: pika.spec.BasicProperties 403 - body: bytes 404 :rtype: Deferred 405 406 """ 407 if self._closed: 408 return defer.fail(self._closed) 409 410 queue_obj = ClosableDeferredQueue() 411 d = defer.Deferred() 412 self._calls.add(d) 413 414 def on_consume_ok(frame): 415 consumer_tag = frame.method.consumer_tag 416 self._queue_name_to_consumer_tags.setdefault( 417 queue, set()).add(consumer_tag) 418 self._consumers[consumer_tag] = queue_obj 419 self._calls.discard(d) 420 d.callback((queue_obj, consumer_tag)) 421 422 def on_message_callback(_channel, method, properties, body): 423 """Add the ReceivedMessage to the queue, while replacing the 424 channel implementation. 425 """ 426 queue_obj.put( 427 ReceivedMessage( 428 channel=self, 429 method=method, 430 properties=properties, 431 body=body, 432 )) 433 434 try: 435 self._channel.basic_consume( 436 queue=queue, 437 on_message_callback=on_message_callback, 438 auto_ack=auto_ack, 439 exclusive=exclusive, 440 consumer_tag=consumer_tag, 441 arguments=arguments, 442 callback=on_consume_ok, 443 ) 444 except Exception: # pylint: disable=W0703 445 return defer.fail() 446 447 return d 448 449 def basic_get(self, queue, auto_ack=False): 450 """Get a single message from the AMQP broker. 451 452 Will return If the queue is empty, it will return None. 453 If you want to 454 be notified of Basic.GetEmpty, use the Channel.add_callback method 455 adding your Basic.GetEmpty callback which should expect only one 456 parameter, frame. Due to implementation details, this cannot be called 457 a second time until the callback is executed. For more information on 458 basic_get and its parameters, see: 459 460 http://www.rabbitmq.com/amqp-0-9-1-reference.html#basic.get 461 462 This method wraps :meth:`Channel.basic_get 463 <pika.channel.Channel.basic_get>`. 464 465 :param str queue: The queue from which to get a message. Use the empty 466 string to specify the most recent server-named queue 467 for this channel. 468 :param bool auto_ack: Tell the broker to not expect a reply 469 :returns: Deferred that fires with a namedtuple whose attributes are: 470 channel: this TwistedChannel 471 method: pika.spec.Basic.GetOk 472 properties: pika.spec.BasicProperties 473 body: bytes 474 If the queue is empty, None will be returned. 475 :rtype: Deferred 476 :raises pika.exceptions.DuplicateGetOkCallback: 477 478 """ 479 if self._basic_get_deferred is not None: 480 raise exceptions.DuplicateGetOkCallback() 481 482 def create_namedtuple(result): 483 if result is None: 484 return None 485 _channel, method, properties, body = result 486 return ReceivedMessage( 487 channel=self, 488 method=method, 489 properties=properties, 490 body=body, 491 ) 492 493 def cleanup_attribute(result): 494 self._basic_get_deferred = None 495 return result 496 497 d = self._wrap_channel_method("basic_get")( 498 queue=queue, auto_ack=auto_ack) 499 d.addCallback(create_namedtuple) 500 d.addBoth(cleanup_attribute) 501 self._basic_get_deferred = d 502 return d 503 504 def basic_nack(self, delivery_tag=None, multiple=False, requeue=True): 505 """This method allows a client to reject one or more incoming messages. 506 It can be used to interrupt and cancel large incoming messages, or 507 return untreatable messages to their original queue. 508 509 :param integer delivery-tag: int/long The server-assigned delivery tag 510 :param bool multiple: If set to True, the delivery tag is treated as 511 "up to and including", so that multiple messages 512 can be acknowledged with a single method. If set 513 to False, the delivery tag refers to a single 514 message. If the multiple field is 1, and the 515 delivery tag is zero, this indicates 516 acknowledgement of all outstanding messages. 517 :param bool requeue: If requeue is true, the server will attempt to 518 requeue the message. If requeue is false or the 519 requeue attempt fails the messages are discarded 520 or dead-lettered. 521 522 """ 523 return self._channel.basic_nack( 524 delivery_tag=delivery_tag, 525 multiple=multiple, 526 requeue=requeue, 527 ) 528 529 def basic_publish(self, 530 exchange, 531 routing_key, 532 body, 533 properties=None, 534 mandatory=False): 535 """Publish to the channel with the given exchange, routing key and body. 536 537 This method wraps :meth:`Channel.basic_publish 538 <pika.channel.Channel.basic_publish>`, but makes sure the channel is 539 not closed before publishing. 540 541 For more information on basic_publish and what the parameters do, see: 542 543 http://www.rabbitmq.com/amqp-0-9-1-reference.html#basic.publish 544 545 :param str exchange: The exchange to publish to 546 :param str routing_key: The routing key to bind on 547 :param bytes body: The message body 548 :param pika.spec.BasicProperties properties: Basic.properties 549 :param bool mandatory: The mandatory flag 550 :returns: A Deferred that fires with the result of the channel's 551 basic_publish. 552 :rtype: Deferred 553 :raises UnroutableError: raised when a message published in 554 publisher-acknowledgments mode (see 555 `BlockingChannel.confirm_delivery`) is returned via `Basic.Return` 556 followed by `Basic.Ack`. 557 :raises NackError: raised when a message published in 558 publisher-acknowledgements mode is Nack'ed by the broker. See 559 `BlockingChannel.confirm_delivery`. 560 561 """ 562 if self._closed: 563 return defer.fail(self._closed) 564 result = self._channel.basic_publish( 565 exchange=exchange, 566 routing_key=routing_key, 567 body=body, 568 properties=properties, 569 mandatory=mandatory) 570 if not self._delivery_confirmation: 571 return defer.succeed(result) 572 else: 573 # See http://www.rabbitmq.com/confirms.html#publisher-confirms 574 self._delivery_message_id += 1 575 self._deliveries[self._delivery_message_id] = defer.Deferred() 576 return self._deliveries[self._delivery_message_id] 577 578 def basic_qos(self, prefetch_size=0, prefetch_count=0, global_qos=False): 579 """Specify quality of service. This method requests a specific quality 580 of service. The QoS can be specified for the current channel or for all 581 channels on the connection. The client can request that messages be 582 sent in advance so that when the client finishes processing a message, 583 the following message is already held locally, rather than needing to 584 be sent down the channel. Prefetching gives a performance improvement. 585 586 :param int prefetch_size: This field specifies the prefetch window 587 size. The server will send a message in 588 advance if it is equal to or smaller in size 589 than the available prefetch size (and also 590 falls into other prefetch limits). May be 591 set to zero, meaning "no specific limit", 592 although other prefetch limits may still 593 apply. The prefetch-size is ignored by 594 consumers who have enabled the no-ack 595 option. 596 :param int prefetch_count: Specifies a prefetch window in terms of 597 whole messages. This field may be used in 598 combination with the prefetch-size field; a 599 message will only be sent in advance if both 600 prefetch windows (and those at the channel 601 and connection level) allow it. The 602 prefetch-count is ignored by consumers who 603 have enabled the no-ack option. 604 :param bool global_qos: Should the QoS apply to all channels on the 605 connection. 606 :returns: Deferred that fires on the Basic.QosOk response 607 :rtype: Deferred 608 609 """ 610 return self._wrap_channel_method("basic_qos")( 611 prefetch_size=prefetch_size, 612 prefetch_count=prefetch_count, 613 global_qos=global_qos, 614 ) 615 616 def basic_reject(self, delivery_tag, requeue=True): 617 """Reject an incoming message. This method allows a client to reject a 618 message. It can be used to interrupt and cancel large incoming 619 messages, or return untreatable messages to their original queue. 620 621 :param integer delivery_tag: int/long The server-assigned delivery tag 622 :param bool requeue: If requeue is true, the server will attempt to 623 requeue the message. If requeue is false or the 624 requeue attempt fails the messages are discarded 625 or dead-lettered. 626 :raises: TypeError 627 628 """ 629 return self._channel.basic_reject( 630 delivery_tag=delivery_tag, requeue=requeue) 631 632 def basic_recover(self, requeue=False): 633 """This method asks the server to redeliver all unacknowledged messages 634 on a specified channel. Zero or more messages may be redelivered. This 635 method replaces the asynchronous Recover. 636 637 :param bool requeue: If False, the message will be redelivered to the 638 original recipient. If True, the server will 639 attempt to requeue the message, potentially then 640 delivering it to an alternative subscriber. 641 :returns: Deferred that fires on the Basic.RecoverOk response 642 :rtype: Deferred 643 644 """ 645 return self._wrap_channel_method("basic_recover")(requeue=requeue) 646 647 def close(self, reply_code=0, reply_text="Normal shutdown"): 648 """Invoke a graceful shutdown of the channel with the AMQP Broker. 649 650 If channel is OPENING, transition to CLOSING and suppress the incoming 651 Channel.OpenOk, if any. 652 653 :param int reply_code: The reason code to send to broker 654 :param str reply_text: The reason text to send to broker 655 656 :raises ChannelWrongStateError: if channel is closed or closing 657 658 """ 659 return self._channel.close(reply_code=reply_code, reply_text=reply_text) 660 661 def confirm_delivery(self): 662 """Turn on Confirm mode in the channel. Pass in a callback to be 663 notified by the Broker when a message has been confirmed as received or 664 rejected (Basic.Ack, Basic.Nack) from the broker to the publisher. 665 666 For more information see: 667 http://www.rabbitmq.com/confirms.html#publisher-confirms 668 669 :returns: Deferred that fires on the Confirm.SelectOk response 670 :rtype: Deferred 671 672 """ 673 if self._delivery_confirmation: 674 LOGGER.error('confirm_delivery: confirmation was already enabled.') 675 return defer.succeed(None) 676 wrapped = self._wrap_channel_method('confirm_delivery') 677 d = wrapped(ack_nack_callback=self._on_delivery_confirmation) 678 679 def set_delivery_confirmation(result): 680 self._delivery_confirmation = True 681 self._delivery_message_id = 0 682 LOGGER.debug("Delivery confirmation enabled.") 683 return result 684 685 d.addCallback(set_delivery_confirmation) 686 # Unroutable messages returned after this point will be in the context 687 # of publisher acknowledgments 688 self._channel.add_on_return_callback(self._on_puback_message_returned) 689 return d 690 691 def _on_delivery_confirmation(self, method_frame): 692 """Invoked by pika when RabbitMQ responds to a Basic.Publish RPC 693 command, passing in either a Basic.Ack or Basic.Nack frame with 694 the delivery tag of the message that was published. The delivery tag 695 is an integer counter indicating the message number that was sent 696 on the channel via Basic.Publish. Here we're just doing house keeping 697 to keep track of stats and remove message numbers that we expect 698 a delivery confirmation of from the list used to keep track of messages 699 that are pending confirmation. 700 701 :param pika.frame.Method method_frame: Basic.Ack or Basic.Nack frame 702 703 """ 704 delivery_tag = method_frame.method.delivery_tag 705 if delivery_tag not in self._deliveries: 706 LOGGER.error("Delivery tag %s not found in the pending deliveries", 707 delivery_tag) 708 return 709 if method_frame.method.multiple: 710 tags = [tag for tag in self._deliveries if tag <= delivery_tag] 711 tags.sort() 712 else: 713 tags = [delivery_tag] 714 for tag in tags: 715 d = self._deliveries[tag] 716 del self._deliveries[tag] 717 if isinstance(method_frame.method, pika.spec.Basic.Nack): 718 # Broker was unable to process the message due to internal 719 # error 720 LOGGER.warning( 721 "Message was Nack'ed by broker: nack=%r; channel=%s;", 722 method_frame.method, self.channel_number) 723 if self._puback_return is not None: 724 returned_messages = [self._puback_return] 725 self._puback_return = None 726 else: 727 returned_messages = [] 728 d.errback(exceptions.NackError(returned_messages)) 729 else: 730 assert isinstance(method_frame.method, pika.spec.Basic.Ack) 731 if self._puback_return is not None: 732 # Unroutable message was returned 733 returned_messages = [self._puback_return] 734 self._puback_return = None 735 d.errback(exceptions.UnroutableError(returned_messages)) 736 else: 737 d.callback(method_frame.method) 738 739 def _on_puback_message_returned(self, channel, method, properties, body): 740 """Called as the result of Basic.Return from broker in 741 publisher-acknowledgements mode. Saves the info as a ReturnedMessage 742 instance in self._puback_return. 743 744 :param pika.Channel channel: our self._impl channel 745 :param pika.spec.Basic.Return method: 746 :param pika.spec.BasicProperties properties: message properties 747 :param bytes body: returned message body; empty string if no body 748 749 """ 750 assert isinstance(method, spec.Basic.Return), method 751 assert isinstance(properties, spec.BasicProperties), properties 752 753 LOGGER.warning( 754 "Published message was returned: _delivery_confirmation=%s; " 755 "channel=%s; method=%r; properties=%r; body_size=%d; " 756 "body_prefix=%.255r", self._delivery_confirmation, 757 channel.channel_number, method, properties, 758 len(body) if body is not None else None, body) 759 760 self._puback_return = ReceivedMessage(channel=self, 761 method=method, properties=properties, body=body) 762 763 def exchange_bind(self, destination, source, routing_key='', 764 arguments=None): 765 """Bind an exchange to another exchange. 766 767 :param str destination: The destination exchange to bind 768 :param str source: The source exchange to bind to 769 :param str routing_key: The routing key to bind on 770 :param dict arguments: Custom key/value pair arguments for the binding 771 :raises ValueError: 772 :returns: Deferred that fires on the Exchange.BindOk response 773 :rtype: Deferred 774 775 """ 776 return self._wrap_channel_method("exchange_bind")( 777 destination=destination, 778 source=source, 779 routing_key=routing_key, 780 arguments=arguments, 781 ) 782 783 def exchange_declare(self, 784 exchange, 785 exchange_type='direct', 786 passive=False, 787 durable=False, 788 auto_delete=False, 789 internal=False, 790 arguments=None): 791 """This method creates an exchange if it does not already exist, and if 792 the exchange exists, verifies that it is of the correct and expected 793 class. 794 795 If passive set, the server will reply with Declare-Ok if the exchange 796 already exists with the same name, and raise an error if not and if the 797 exchange does not already exist, the server MUST raise a channel 798 exception with reply code 404 (not found). 799 800 :param str exchange: The exchange name consists of a non-empty sequence 801 of these characters: letters, digits, hyphen, underscore, period, 802 or colon 803 :param str exchange_type: The exchange type to use 804 :param bool passive: Perform a declare or just check to see if it 805 exists 806 :param bool durable: Survive a reboot of RabbitMQ 807 :param bool auto_delete: Remove when no more queues are bound to it 808 :param bool internal: Can only be published to by other exchanges 809 :param dict arguments: Custom key/value pair arguments for the exchange 810 :returns: Deferred that fires on the Exchange.DeclareOk response 811 :rtype: Deferred 812 :raises ValueError: 813 814 """ 815 return self._wrap_channel_method("exchange_declare")( 816 exchange=exchange, 817 exchange_type=exchange_type, 818 passive=passive, 819 durable=durable, 820 auto_delete=auto_delete, 821 internal=internal, 822 arguments=arguments, 823 ) 824 825 def exchange_delete(self, exchange=None, if_unused=False): 826 """Delete the exchange. 827 828 :param str exchange: The exchange name 829 :param bool if_unused: only delete if the exchange is unused 830 :returns: Deferred that fires on the Exchange.DeleteOk response 831 :rtype: Deferred 832 :raises ValueError: 833 834 """ 835 return self._wrap_channel_method("exchange_delete")( 836 exchange=exchange, 837 if_unused=if_unused, 838 ) 839 840 def exchange_unbind(self, 841 destination=None, 842 source=None, 843 routing_key='', 844 arguments=None): 845 """Unbind an exchange from another exchange. 846 847 :param str destination: The destination exchange to unbind 848 :param str source: The source exchange to unbind from 849 :param str routing_key: The routing key to unbind 850 :param dict arguments: Custom key/value pair arguments for the binding 851 :returns: Deferred that fires on the Exchange.UnbindOk response 852 :rtype: Deferred 853 :raises ValueError: 854 855 """ 856 return self._wrap_channel_method("exchange_unbind")( 857 destination=destination, 858 source=source, 859 routing_key=routing_key, 860 arguments=arguments, 861 ) 862 863 def flow(self, active): 864 """Turn Channel flow control off and on. 865 866 Returns a Deferred that will fire with a bool indicating the channel 867 flow state. For more information, please reference: 868 869 http://www.rabbitmq.com/amqp-0-9-1-reference.html#channel.flow 870 871 :param bool active: Turn flow on or off 872 :returns: Deferred that fires with the channel flow state 873 :rtype: Deferred 874 :raises ValueError: 875 876 """ 877 return self._wrap_channel_method("flow")(active=active) 878 879 def open(self): 880 """Open the channel""" 881 return self._channel.open() 882 883 def queue_bind(self, queue, exchange, routing_key=None, arguments=None): 884 """Bind the queue to the specified exchange 885 886 :param str queue: The queue to bind to the exchange 887 :param str exchange: The source exchange to bind to 888 :param str routing_key: The routing key to bind on 889 :param dict arguments: Custom key/value pair arguments for the binding 890 :returns: Deferred that fires on the Queue.BindOk response 891 :rtype: Deferred 892 :raises ValueError: 893 894 """ 895 return self._wrap_channel_method("queue_bind")( 896 queue=queue, 897 exchange=exchange, 898 routing_key=routing_key, 899 arguments=arguments, 900 ) 901 902 def queue_declare(self, 903 queue, 904 passive=False, 905 durable=False, 906 exclusive=False, 907 auto_delete=False, 908 arguments=None): 909 """Declare queue, create if needed. This method creates or checks a 910 queue. When creating a new queue the client can specify various 911 properties that control the durability of the queue and its contents, 912 and the level of sharing for the queue. 913 914 Use an empty string as the queue name for the broker to auto-generate 915 one 916 917 :param str queue: The queue name; if empty string, the broker will 918 create a unique queue name 919 :param bool passive: Only check to see if the queue exists 920 :param bool durable: Survive reboots of the broker 921 :param bool exclusive: Only allow access by the current connection 922 :param bool auto_delete: Delete after consumer cancels or disconnects 923 :param dict arguments: Custom key/value arguments for the queue 924 :returns: Deferred that fires on the Queue.DeclareOk response 925 :rtype: Deferred 926 :raises ValueError: 927 928 """ 929 return self._wrap_channel_method("queue_declare")( 930 queue=queue, 931 passive=passive, 932 durable=durable, 933 exclusive=exclusive, 934 auto_delete=auto_delete, 935 arguments=arguments, 936 ) 937 938 def queue_delete(self, queue, if_unused=False, if_empty=False): 939 """Delete a queue from the broker. 940 941 942 This method wraps :meth:`Channel.queue_delete 943 <pika.channel.Channel.queue_delete>`, and removes the reference to the 944 queue object after it gets deleted on the server. 945 946 :param str queue: The queue to delete 947 :param bool if_unused: only delete if it's unused 948 :param bool if_empty: only delete if the queue is empty 949 :returns: Deferred that fires on the Queue.DeleteOk response 950 :rtype: Deferred 951 :raises ValueError: 952 953 """ 954 wrapped = self._wrap_channel_method('queue_delete') 955 d = wrapped(queue=queue, if_unused=if_unused, if_empty=if_empty) 956 957 def _clear_consumer(ret, queue_name): 958 for consumer_tag in list( 959 self._queue_name_to_consumer_tags.get(queue_name, set())): 960 self._consumers[consumer_tag].close( 961 exceptions.ConsumerCancelled( 962 "Queue %s was deleted." % queue_name)) 963 del self._consumers[consumer_tag] 964 self._queue_name_to_consumer_tags[queue_name].remove( 965 consumer_tag) 966 return ret 967 968 return d.addCallback(_clear_consumer, queue) 969 970 def queue_purge(self, queue): 971 """Purge all of the messages from the specified queue 972 973 :param str queue: The queue to purge 974 :returns: Deferred that fires on the Queue.PurgeOk response 975 :rtype: Deferred 976 :raises ValueError: 977 978 """ 979 return self._wrap_channel_method("queue_purge")(queue=queue) 980 981 def queue_unbind(self, 982 queue, 983 exchange=None, 984 routing_key=None, 985 arguments=None): 986 """Unbind a queue from an exchange. 987 988 :param str queue: The queue to unbind from the exchange 989 :param str exchange: The source exchange to bind from 990 :param str routing_key: The routing key to unbind 991 :param dict arguments: Custom key/value pair arguments for the binding 992 :returns: Deferred that fires on the Queue.UnbindOk response 993 :rtype: Deferred 994 :raises ValueError: 995 996 """ 997 return self._wrap_channel_method("queue_unbind")( 998 queue=queue, 999 exchange=exchange, 1000 routing_key=routing_key, 1001 arguments=arguments, 1002 ) 1003 1004 def tx_commit(self): 1005 """Commit a transaction. 1006 1007 :returns: Deferred that fires on the Tx.CommitOk response 1008 :rtype: Deferred 1009 :raises ValueError: 1010 1011 """ 1012 return self._wrap_channel_method("tx_commit")() 1013 1014 def tx_rollback(self): 1015 """Rollback a transaction. 1016 1017 :returns: Deferred that fires on the Tx.RollbackOk response 1018 :rtype: Deferred 1019 :raises ValueError: 1020 1021 """ 1022 return self._wrap_channel_method("tx_rollback")() 1023 1024 def tx_select(self): 1025 """Select standard transaction mode. This method sets the channel to use 1026 standard transactions. The client must use this method at least once on 1027 a channel before using the Commit or Rollback methods. 1028 1029 :returns: Deferred that fires on the Tx.SelectOk response 1030 :rtype: Deferred 1031 :raises ValueError: 1032 1033 """ 1034 return self._wrap_channel_method("tx_select")() 1035 1036 1037class _TwistedConnectionAdapter(pika.connection.Connection): 1038 """A Twisted-specific implementation of a Pika Connection. 1039 1040 NOTE: since `base_connection.BaseConnection`'s primary responsibility is 1041 management of the transport, we use `pika.connection.Connection` directly 1042 as our base class because this adapter uses a different transport 1043 management strategy. 1044 1045 """ 1046 1047 def __init__(self, parameters, on_open_callback, on_open_error_callback, 1048 on_close_callback, custom_reactor): 1049 super(_TwistedConnectionAdapter, self).__init__( 1050 parameters=parameters, 1051 on_open_callback=on_open_callback, 1052 on_open_error_callback=on_open_error_callback, 1053 on_close_callback=on_close_callback, 1054 internal_connection_workflow=False) 1055 1056 self._reactor = custom_reactor or reactor 1057 self._transport = None # to be provided by `connection_made()` 1058 1059 def _adapter_call_later(self, delay, callback): 1060 """Implement 1061 :py:meth:`pika.connection.Connection._adapter_call_later()`. 1062 1063 """ 1064 check_callback_arg(callback, 'callback') 1065 return _TimerHandle(self._reactor.callLater(delay, callback)) 1066 1067 def _adapter_remove_timeout(self, timeout_id): 1068 """Implement 1069 :py:meth:`pika.connection.Connection._adapter_remove_timeout()`. 1070 1071 """ 1072 timeout_id.cancel() 1073 1074 def _adapter_add_callback_threadsafe(self, callback): 1075 """Implement 1076 :py:meth:`pika.connection.Connection._adapter_add_callback_threadsafe()`. 1077 1078 """ 1079 check_callback_arg(callback, 'callback') 1080 self._reactor.callFromThread(callback) 1081 1082 def _adapter_connect_stream(self): 1083 """Implement pure virtual 1084 :py:ref:meth:`pika.connection.Connection._adapter_connect_stream()` 1085 method. 1086 1087 NOTE: This should not be called due to our initialization of Connection 1088 via `internal_connection_workflow=False` 1089 """ 1090 raise NotImplementedError 1091 1092 def _adapter_disconnect_stream(self): 1093 """Implement pure virtual 1094 :py:ref:meth:`pika.connection.Connection._adapter_disconnect_stream()` 1095 method. 1096 1097 """ 1098 self._transport.loseConnection() 1099 1100 def _adapter_emit_data(self, data): 1101 """Implement pure virtual 1102 :py:ref:meth:`pika.connection.Connection._adapter_emit_data()` method. 1103 1104 """ 1105 self._transport.write(data) 1106 1107 def connection_made(self, transport): 1108 """Introduces transport to protocol after transport is connected. 1109 1110 :param twisted.internet.interfaces.ITransport transport: 1111 :raises Exception: Exception-based exception on error 1112 1113 """ 1114 self._transport = transport 1115 # Let connection know that stream is available 1116 self._on_stream_connected() 1117 1118 def connection_lost(self, error): 1119 """Called upon loss or closing of TCP connection. 1120 1121 NOTE: `connection_made()` and `connection_lost()` are each called just 1122 once and in that order. All other callbacks are called between them. 1123 1124 :param Failure: A Twisted Failure instance wrapping an exception. 1125 1126 """ 1127 self._transport = None 1128 error = error.value # drop the Failure wrapper 1129 if isinstance(error, twisted_error.ConnectionDone): 1130 self._error = error 1131 error = None 1132 LOGGER.log(logging.DEBUG if error is None else logging.ERROR, 1133 'connection_lost: %r', error) 1134 self._on_stream_terminated(error) 1135 1136 def data_received(self, data): 1137 """Called to deliver incoming data from the server to the protocol. 1138 1139 :param data: Non-empty data bytes. 1140 :raises Exception: Exception-based exception on error 1141 1142 """ 1143 self._on_data_available(data) 1144 1145 1146class TwistedProtocolConnection(protocol.Protocol): 1147 """A Pika-specific implementation of a Twisted Protocol. Allows using 1148 Twisted's non-blocking connectTCP/connectSSL methods for connecting to the 1149 server. 1150 1151 TwistedProtocolConnection objects have a `ready` instance variable that's a 1152 Deferred which fires when the connection is ready to be used (the initial 1153 AMQP handshaking has been done). You *have* to wait for this Deferred to 1154 fire before requesting a channel. 1155 1156 Once the connection is ready, you will be able to use the `closed` instance 1157 variable: a Deferred which fires when the connection is closed. 1158 1159 Since it's Twisted handling connection establishing it does not accept 1160 connect callbacks, you have to implement that within Twisted. Also remember 1161 that the host, port and ssl values of the connection parameters are ignored 1162 because, yet again, it's Twisted who manages the connection. 1163 1164 """ 1165 1166 def __init__(self, parameters=None, custom_reactor=None): 1167 self.ready = defer.Deferred() 1168 self.ready.addCallback(lambda _: self.connectionReady()) 1169 self.closed = None 1170 self._impl = _TwistedConnectionAdapter( 1171 parameters=parameters, 1172 on_open_callback=self._on_connection_ready, 1173 on_open_error_callback=self._on_connection_failed, 1174 on_close_callback=self._on_connection_closed, 1175 custom_reactor=custom_reactor, 1176 ) 1177 1178 def channel(self, channel_number=None): # pylint: disable=W0221 1179 """Create a new channel with the next available channel number or pass 1180 in a channel number to use. Must be non-zero if you would like to 1181 specify but it is recommended that you let Pika manage the channel 1182 numbers. 1183 1184 :param int channel_number: The channel number to use, defaults to the 1185 next available. 1186 :returns: a Deferred that fires with an instance of a wrapper around 1187 the Pika Channel class. 1188 :rtype: Deferred 1189 1190 """ 1191 d = defer.Deferred() 1192 self._impl.channel(channel_number, d.callback) 1193 return d.addCallback(TwistedChannel) 1194 1195 @property 1196 def is_closed(self): 1197 # For compatibility with previous releases. 1198 return self._impl.is_closed 1199 1200 def close(self, reply_code=200, reply_text='Normal shutdown'): 1201 if not self._impl.is_closed: 1202 self._impl.close(reply_code, reply_text) 1203 return self.closed 1204 1205 # IProtocol methods 1206 1207 def dataReceived(self, data): 1208 # Pass the bytes to Pika for parsing 1209 self._impl.data_received(data) 1210 1211 def connectionLost(self, reason=protocol.connectionDone): 1212 self._impl.connection_lost(reason) 1213 # Let the caller know there's been an error 1214 d, self.ready = self.ready, None 1215 if d: 1216 d.errback(reason) 1217 1218 def makeConnection(self, transport): 1219 self._impl.connection_made(transport) 1220 protocol.Protocol.makeConnection(self, transport) 1221 1222 # Our own methods 1223 1224 def connectionReady(self): 1225 """This method will be called when the underlying connection is ready. 1226 """ 1227 1228 def _on_connection_ready(self, _connection): 1229 d, self.ready = self.ready, None 1230 if d: 1231 self.closed = defer.Deferred() 1232 d.callback(None) 1233 1234 def _on_connection_failed(self, _connection, _error_message=None): 1235 d, self.ready = self.ready, None 1236 if d: 1237 attempts = self._impl.params.connection_attempts 1238 exc = exceptions.AMQPConnectionError(attempts) 1239 d.errback(exc) 1240 1241 def _on_connection_closed(self, _connection, exception): 1242 d, self.closed = self.closed, None 1243 if d: 1244 if isinstance(exception, Failure): 1245 # Calling `callback` with a Failure instance will trigger the 1246 # errback path. 1247 exception = exception.value 1248 d.callback(exception) 1249 1250 1251class _TimerHandle(nbio_interface.AbstractTimerReference): 1252 """This module's adaptation of `nbio_interface.AbstractTimerReference`. 1253 1254 """ 1255 1256 def __init__(self, handle): 1257 """ 1258 1259 :param twisted.internet.base.DelayedCall handle: 1260 """ 1261 self._handle = handle 1262 1263 def cancel(self): 1264 if self._handle is not None: 1265 try: 1266 self._handle.cancel() 1267 except (twisted_error.AlreadyCalled, 1268 twisted_error.AlreadyCancelled): 1269 pass 1270 1271 self._handle = None 1272