1"""Using Pika with a Twisted reactor.
2
3The interfaces in this module are Deferred-based when possible. This means that
4the connection.channel() method and most of the channel methods return
5Deferreds instead of taking a callback argument and that basic_consume()
6returns a Twisted DeferredQueue where messages from the server will be
7stored. Refer to the docstrings for TwistedProtocolConnection.channel() and the
8TwistedChannel class for details.
9
10"""
11
12import functools
13import logging
14from collections import namedtuple
15
16from twisted.internet import (defer, error as twisted_error, reactor, protocol)
17from twisted.python.failure import Failure
18
19import pika.connection
20from pika import exceptions, spec
21from pika.adapters.utils import nbio_interface
22from pika.adapters.utils.io_services_utils import check_callback_arg
23
24# Twistisms
25# pylint: disable=C0111,C0103
26# Other
27# pylint: disable=too-many-lines
28
29LOGGER = logging.getLogger(__name__)
30
31
32class ClosableDeferredQueue(defer.DeferredQueue):
33    """
34    Like the normal Twisted DeferredQueue, but after close() is called with an
35    exception instance all pending Deferreds are errbacked and further attempts
36    to call get() or put() return a Failure wrapping that exception.
37    """
38
39    def __init__(self, size=None, backlog=None):
40        self.closed = None
41        super(ClosableDeferredQueue, self).__init__(size, backlog)
42
43    def put(self, obj):
44        """
45        Like the original :meth:`DeferredQueue.put` method, but returns an
46        errback if the queue is closed.
47
48        """
49        if self.closed:
50            LOGGER.error('Impossible to put to the queue, it is closed.')
51            return defer.fail(self.closed)
52        return defer.DeferredQueue.put(self, obj)
53
54    def get(self):
55        """
56        Returns a Deferred that will fire with the next item in the queue, when
57        it's available.
58
59        The Deferred will errback if the queue is closed.
60
61        :returns: Deferred that fires with the next item.
62        :rtype: Deferred
63
64        """
65        if self.closed:
66            LOGGER.error('Impossible to get from the queue, it is closed.')
67            return defer.fail(self.closed)
68        return defer.DeferredQueue.get(self)
69
70    def close(self, reason):
71        """Closes the queue.
72
73        Errback the pending calls to :meth:`get()`.
74
75        """
76        if self.closed:
77            LOGGER.warning('Queue was already closed with reason: %s.',
78                           self.closed)
79        self.closed = reason
80        while self.waiting:
81            self.waiting.pop().errback(reason)
82        self.pending = []
83
84
85ReceivedMessage = namedtuple("ReceivedMessage",
86                             ["channel", "method", "properties", "body"])
87
88
89class TwistedChannel(object):
90    """A wrapper around Pika's Channel.
91
92    Channel methods that normally take a callback argument are wrapped to
93    return a Deferred that fires with whatever would be passed to the callback.
94    If the channel gets closed, all pending Deferreds are errbacked with a
95    ChannelClosed exception. The returned Deferreds fire with whatever
96    arguments the callback to the original method would receive.
97
98    Some methods like basic_consume and basic_get are wrapped in a special way,
99    see their docstrings for details.
100    """
101
102    def __init__(self, channel):
103        self._channel = channel
104        self._closed = None
105        self._calls = set()
106        self._consumers = {}
107        # Store Basic.Get calls so we can handle GetEmpty replies
108        self._basic_get_deferred = None
109        self._channel.add_callback(self._on_getempty, [spec.Basic.GetEmpty],
110                                   False)
111        # We need this mapping to close the ClosableDeferredQueue when a queue
112        # is deleted.
113        self._queue_name_to_consumer_tags = {}
114        # Whether RabbitMQ delivery confirmation has been enabled
115        self._delivery_confirmation = False
116        self._delivery_message_id = None
117        self._deliveries = {}
118        # Holds a ReceivedMessage object representing a message received via
119        # Basic.Return in publisher-acknowledgments mode.
120        self._puback_return = None
121
122        self.on_closed = defer.Deferred()
123        self._channel.add_on_close_callback(self._on_channel_closed)
124        self._channel.add_on_cancel_callback(
125            self._on_consumer_cancelled_by_broker)
126
127    def __repr__(self):
128        return '<{cls} channel={chan!r}>'.format(
129            cls=self.__class__.__name__, chan=self._channel)
130
131    def _on_channel_closed(self, _channel, reason):
132        # enter the closed state
133        self._closed = reason
134        # errback all pending calls
135        for d in self._calls:
136            d.errback(self._closed)
137        # close all open queues
138        for consumer in self._consumers.values():
139            consumer.close(self._closed)
140        # release references to stored objects
141        self._calls = set()
142        self._consumers = {}
143        self.on_closed.callback(self._closed)
144
145    def _on_consumer_cancelled_by_broker(self, method_frame):
146        """Called by impl when broker cancels consumer via Basic.Cancel.
147
148        This is a RabbitMQ-specific feature. The circumstances include deletion
149        of queue being consumed as well as failure of a HA node responsible for
150        the queue being consumed.
151
152        :param pika.frame.Method method_frame: method frame with the
153            `spec.Basic.Cancel` method
154
155        """
156        return self._on_consumer_cancelled(method_frame)
157
158    def _on_consumer_cancelled(self, frame):
159        """Called when the broker cancels a consumer via Basic.Cancel or when
160        the broker responds to a Basic.Cancel request by Basic.CancelOk.
161
162        :param pika.frame.Method frame: method frame with the
163            `spec.Basic.Cancel` or `spec.Basic.CancelOk` method
164
165        """
166        consumer_tag = frame.method.consumer_tag
167        if consumer_tag not in self._consumers:
168            # Could be cancelled by user or broker earlier
169            LOGGER.warning('basic_cancel - consumer not found: %s',
170                           consumer_tag)
171            return frame
172        self._consumers[consumer_tag].close(exceptions.ConsumerCancelled())
173        del self._consumers[consumer_tag]
174        # Remove from the queue-to-ctags index:
175        for ctags in self._queue_name_to_consumer_tags.values():
176            try:
177                ctags.remove(consumer_tag)
178            except KeyError:
179                continue
180        return frame
181
182    def _on_getempty(self, _method_frame):
183        """Callback the Basic.Get deferred with None.
184        """
185        if self._basic_get_deferred is None:
186            LOGGER.warning("Got Basic.GetEmpty but no Basic.Get calls "
187                           "were pending.")
188            return
189        self._basic_get_deferred.callback(None)
190
191    def _wrap_channel_method(self, name):
192        """Wrap Pika's Channel method to make it return a Deferred that fires
193        when the method completes and errbacks if the channel gets closed. If
194        the original method's callback would receive more than one argument,
195        the Deferred fires with a tuple of argument values.
196
197        """
198        method = getattr(self._channel, name)
199
200        @functools.wraps(method)
201        def wrapped(*args, **kwargs):
202            if self._closed:
203                return defer.fail(self._closed)
204
205            d = defer.Deferred()
206            self._calls.add(d)
207            d.addCallback(self._clear_call, d)
208
209            def single_argument(*args):
210                """
211                Make sure that the deferred is called with a single argument.
212                In case the original callback fires with more than one, convert
213                to a tuple.
214                """
215                if len(args) > 1:
216                    d.callback(tuple(args))
217                else:
218                    d.callback(*args)
219
220            kwargs['callback'] = single_argument
221
222            try:
223                method(*args, **kwargs)
224            except Exception:  # pylint: disable=W0703
225                return defer.fail()
226            return d
227
228        return wrapped
229
230    def _clear_call(self, ret, d):
231        self._calls.discard(d)
232        return ret
233
234    # Public Channel attributes
235
236    @property
237    def channel_number(self):
238        return self._channel.channel_number
239
240    @property
241    def connection(self):
242        return self._channel.connection
243
244    @property
245    def is_closed(self):
246        """Returns True if the channel is closed.
247
248        :rtype: bool
249
250        """
251        return self._channel.is_closed
252
253    @property
254    def is_closing(self):
255        """Returns True if client-initiated closing of the channel is in
256        progress.
257
258        :rtype: bool
259
260        """
261        return self._channel.is_closing
262
263    @property
264    def is_open(self):
265        """Returns True if the channel is open.
266
267        :rtype: bool
268
269        """
270        return self._channel.is_open
271
272    @property
273    def flow_active(self):
274        return self._channel.flow_active
275
276    @property
277    def consumer_tags(self):
278        return self._channel.consumer_tags
279
280    # Deferred-equivalents of public Channel methods
281
282    def callback_deferred(self, deferred, replies):
283        """Pass in a Deferred and a list replies from the RabbitMQ broker which
284        you'd like the Deferred to be callbacked on with the frame as callback
285        value.
286
287        :param Deferred deferred: The Deferred to callback
288        :param list replies: The replies to callback on
289
290        """
291        self._channel.add_callback(deferred.callback, replies)
292
293    # Public Channel methods
294
295    def add_on_return_callback(self, callback):
296        """Pass a callback function that will be called when a published
297        message is rejected and returned by the server via `Basic.Return`.
298
299        :param callable callback: The method to call on callback with the
300            message as only argument. The message is a named tuple with
301            the following attributes:
302            channel: this TwistedChannel
303            method: pika.spec.Basic.Return
304            properties: pika.spec.BasicProperties
305            body: bytes
306
307        """
308        self._channel.add_on_return_callback(
309            lambda _channel, method, properties, body: callback(
310                ReceivedMessage(
311                    channel=self,
312                    method=method,
313                    properties=properties,
314                    body=body,
315                )
316            )
317        )
318
319    def basic_ack(self, delivery_tag=0, multiple=False):
320        """Acknowledge one or more messages. When sent by the client, this
321        method acknowledges one or more messages delivered via the Deliver or
322        Get-Ok methods. When sent by server, this method acknowledges one or
323        more messages published with the Publish method on a channel in
324        confirm mode. The acknowledgement can be for a single message or a
325        set of messages up to and including a specific message.
326
327        :param integer delivery_tag: int/long The server-assigned delivery tag
328        :param bool multiple: If set to True, the delivery tag is treated as
329                              "up to and including", so that multiple messages
330                              can be acknowledged with a single method. If set
331                              to False, the delivery tag refers to a single
332                              message. If the multiple field is 1, and the
333                              delivery tag is zero, this indicates
334                              acknowledgement of all outstanding messages.
335
336        """
337        return self._channel.basic_ack(
338            delivery_tag=delivery_tag, multiple=multiple)
339
340    def basic_cancel(self, consumer_tag=''):
341        """This method cancels a consumer. This does not affect already
342        delivered messages, but it does mean the server will not send any more
343        messages for that consumer. The client may receive an arbitrary number
344        of messages in between sending the cancel method and receiving the
345        cancel-ok reply. It may also be sent from the server to the client in
346        the event of the consumer being unexpectedly cancelled (i.e. cancelled
347        for any reason other than the server receiving the corresponding
348        basic.cancel from the client). This allows clients to be notified of
349        the loss of consumers due to events such as queue deletion.
350
351        This method wraps :meth:`Channel.basic_cancel
352        <pika.channel.Channel.basic_cancel>` and closes any deferred queue
353        associated with that consumer.
354
355        :param str consumer_tag: Identifier for the consumer
356        :returns: Deferred that fires on the Basic.CancelOk response
357        :rtype: Deferred
358        :raises ValueError:
359
360        """
361        wrapped = self._wrap_channel_method('basic_cancel')
362        d = wrapped(consumer_tag=consumer_tag)
363        return d.addCallback(self._on_consumer_cancelled)
364
365    def basic_consume(self,
366                      queue,
367                      auto_ack=False,
368                      exclusive=False,
369                      consumer_tag=None,
370                      arguments=None):
371        """Consume from a server queue.
372
373        Sends the AMQP 0-9-1 command Basic.Consume to the broker and binds
374        messages for the consumer_tag to a
375        :class:`ClosableDeferredQueue`. If you do not pass in a
376        consumer_tag, one will be automatically generated for you.
377
378        For more information on basic_consume, see:
379        Tutorial 2 at http://www.rabbitmq.com/getstarted.html
380        http://www.rabbitmq.com/confirms.html
381        http://www.rabbitmq.com/amqp-0-9-1-reference.html#basic.consume
382
383        :param str queue: The queue to consume from. Use the empty string to
384            specify the most recent server-named queue for this channel.
385        :param bool auto_ack: if set to True, automatic acknowledgement mode
386            will be used (see http://www.rabbitmq.com/confirms.html). This
387            corresponds with the 'no_ack' parameter in the basic.consume AMQP
388            0.9.1 method
389        :param bool exclusive: Don't allow other consumers on the queue
390        :param str consumer_tag: Specify your own consumer tag
391        :param dict arguments: Custom key/value pair arguments for the consumer
392        :returns: Deferred that fires with a tuple
393            ``(queue_object, consumer_tag)``. The Deferred will errback with an
394            instance of :class:`exceptions.ChannelClosed` if the call fails.
395            The queue object is an instance of :class:`ClosableDeferredQueue`,
396            where data received from the queue will be stored. Clients should
397            use its :meth:`get() <ClosableDeferredQueue.get>` method to fetch
398            an individual message, which will return a Deferred firing with a
399            namedtuple whose attributes are:
400             - channel: this TwistedChannel
401             - method: pika.spec.Basic.Deliver
402             - properties: pika.spec.BasicProperties
403             - body: bytes
404        :rtype: Deferred
405
406        """
407        if self._closed:
408            return defer.fail(self._closed)
409
410        queue_obj = ClosableDeferredQueue()
411        d = defer.Deferred()
412        self._calls.add(d)
413
414        def on_consume_ok(frame):
415            consumer_tag = frame.method.consumer_tag
416            self._queue_name_to_consumer_tags.setdefault(
417                queue, set()).add(consumer_tag)
418            self._consumers[consumer_tag] = queue_obj
419            self._calls.discard(d)
420            d.callback((queue_obj, consumer_tag))
421
422        def on_message_callback(_channel, method, properties, body):
423            """Add the ReceivedMessage to the queue, while replacing the
424            channel implementation.
425            """
426            queue_obj.put(
427                ReceivedMessage(
428                    channel=self,
429                    method=method,
430                    properties=properties,
431                    body=body,
432                ))
433
434        try:
435            self._channel.basic_consume(
436                queue=queue,
437                on_message_callback=on_message_callback,
438                auto_ack=auto_ack,
439                exclusive=exclusive,
440                consumer_tag=consumer_tag,
441                arguments=arguments,
442                callback=on_consume_ok,
443            )
444        except Exception:  # pylint: disable=W0703
445            return defer.fail()
446
447        return d
448
449    def basic_get(self, queue, auto_ack=False):
450        """Get a single message from the AMQP broker.
451
452        Will return If the queue is empty, it will return None.
453        If you want to
454        be notified of Basic.GetEmpty, use the Channel.add_callback method
455        adding your Basic.GetEmpty callback which should expect only one
456        parameter, frame. Due to implementation details, this cannot be called
457        a second time until the callback is executed.  For more information on
458        basic_get and its parameters, see:
459
460        http://www.rabbitmq.com/amqp-0-9-1-reference.html#basic.get
461
462        This method wraps :meth:`Channel.basic_get
463        <pika.channel.Channel.basic_get>`.
464
465        :param str queue: The queue from which to get a message. Use the empty
466                      string to specify the most recent server-named queue
467                      for this channel.
468        :param bool auto_ack: Tell the broker to not expect a reply
469        :returns: Deferred that fires with a namedtuple whose attributes are:
470            channel: this TwistedChannel
471            method: pika.spec.Basic.GetOk
472            properties: pika.spec.BasicProperties
473            body: bytes
474            If the queue is empty, None will be returned.
475        :rtype: Deferred
476        :raises pika.exceptions.DuplicateGetOkCallback:
477
478        """
479        if self._basic_get_deferred is not None:
480            raise exceptions.DuplicateGetOkCallback()
481
482        def create_namedtuple(result):
483            if result is None:
484                return None
485            _channel, method, properties, body = result
486            return ReceivedMessage(
487                channel=self,
488                method=method,
489                properties=properties,
490                body=body,
491            )
492
493        def cleanup_attribute(result):
494            self._basic_get_deferred = None
495            return result
496
497        d = self._wrap_channel_method("basic_get")(
498            queue=queue, auto_ack=auto_ack)
499        d.addCallback(create_namedtuple)
500        d.addBoth(cleanup_attribute)
501        self._basic_get_deferred = d
502        return d
503
504    def basic_nack(self, delivery_tag=None, multiple=False, requeue=True):
505        """This method allows a client to reject one or more incoming messages.
506        It can be used to interrupt and cancel large incoming messages, or
507        return untreatable messages to their original queue.
508
509        :param integer delivery-tag: int/long The server-assigned delivery tag
510        :param bool multiple: If set to True, the delivery tag is treated as
511                              "up to and including", so that multiple messages
512                              can be acknowledged with a single method. If set
513                              to False, the delivery tag refers to a single
514                              message. If the multiple field is 1, and the
515                              delivery tag is zero, this indicates
516                              acknowledgement of all outstanding messages.
517        :param bool requeue: If requeue is true, the server will attempt to
518                             requeue the message. If requeue is false or the
519                             requeue attempt fails the messages are discarded
520                             or dead-lettered.
521
522        """
523        return self._channel.basic_nack(
524            delivery_tag=delivery_tag,
525            multiple=multiple,
526            requeue=requeue,
527        )
528
529    def basic_publish(self,
530                      exchange,
531                      routing_key,
532                      body,
533                      properties=None,
534                      mandatory=False):
535        """Publish to the channel with the given exchange, routing key and body.
536
537        This method wraps :meth:`Channel.basic_publish
538        <pika.channel.Channel.basic_publish>`, but makes sure the channel is
539        not closed before publishing.
540
541        For more information on basic_publish and what the parameters do, see:
542
543        http://www.rabbitmq.com/amqp-0-9-1-reference.html#basic.publish
544
545        :param str exchange: The exchange to publish to
546        :param str routing_key: The routing key to bind on
547        :param bytes body: The message body
548        :param pika.spec.BasicProperties properties: Basic.properties
549        :param bool mandatory: The mandatory flag
550        :returns: A Deferred that fires with the result of the channel's
551            basic_publish.
552        :rtype: Deferred
553        :raises UnroutableError: raised when a message published in
554            publisher-acknowledgments mode (see
555            `BlockingChannel.confirm_delivery`) is returned via `Basic.Return`
556            followed by `Basic.Ack`.
557        :raises NackError: raised when a message published in
558            publisher-acknowledgements mode is Nack'ed by the broker. See
559            `BlockingChannel.confirm_delivery`.
560
561        """
562        if self._closed:
563            return defer.fail(self._closed)
564        result = self._channel.basic_publish(
565            exchange=exchange,
566            routing_key=routing_key,
567            body=body,
568            properties=properties,
569            mandatory=mandatory)
570        if not self._delivery_confirmation:
571            return defer.succeed(result)
572        else:
573            # See http://www.rabbitmq.com/confirms.html#publisher-confirms
574            self._delivery_message_id += 1
575            self._deliveries[self._delivery_message_id] = defer.Deferred()
576            return self._deliveries[self._delivery_message_id]
577
578    def basic_qos(self, prefetch_size=0, prefetch_count=0, global_qos=False):
579        """Specify quality of service. This method requests a specific quality
580        of service. The QoS can be specified for the current channel or for all
581        channels on the connection. The client can request that messages be
582        sent in advance so that when the client finishes processing a message,
583        the following message is already held locally, rather than needing to
584        be sent down the channel. Prefetching gives a performance improvement.
585
586        :param int prefetch_size:  This field specifies the prefetch window
587                                   size. The server will send a message in
588                                   advance if it is equal to or smaller in size
589                                   than the available prefetch size (and also
590                                   falls into other prefetch limits). May be
591                                   set to zero, meaning "no specific limit",
592                                   although other prefetch limits may still
593                                   apply. The prefetch-size is ignored by
594                                   consumers who have enabled the no-ack
595                                   option.
596        :param int prefetch_count: Specifies a prefetch window in terms of
597                                   whole messages. This field may be used in
598                                   combination with the prefetch-size field; a
599                                   message will only be sent in advance if both
600                                   prefetch windows (and those at the channel
601                                   and connection level) allow it. The
602                                   prefetch-count is ignored by consumers who
603                                   have enabled the no-ack option.
604        :param bool global_qos:    Should the QoS apply to all channels on the
605                                   connection.
606        :returns: Deferred that fires on the Basic.QosOk response
607        :rtype: Deferred
608
609        """
610        return self._wrap_channel_method("basic_qos")(
611            prefetch_size=prefetch_size,
612            prefetch_count=prefetch_count,
613            global_qos=global_qos,
614        )
615
616    def basic_reject(self, delivery_tag, requeue=True):
617        """Reject an incoming message. This method allows a client to reject a
618        message. It can be used to interrupt and cancel large incoming
619        messages, or return untreatable messages to their original queue.
620
621        :param integer delivery_tag: int/long The server-assigned delivery tag
622        :param bool requeue: If requeue is true, the server will attempt to
623                             requeue the message. If requeue is false or the
624                             requeue attempt fails the messages are discarded
625                             or dead-lettered.
626        :raises: TypeError
627
628        """
629        return self._channel.basic_reject(
630            delivery_tag=delivery_tag, requeue=requeue)
631
632    def basic_recover(self, requeue=False):
633        """This method asks the server to redeliver all unacknowledged messages
634        on a specified channel. Zero or more messages may be redelivered. This
635        method replaces the asynchronous Recover.
636
637        :param bool requeue: If False, the message will be redelivered to the
638                             original recipient. If True, the server will
639                             attempt to requeue the message, potentially then
640                             delivering it to an alternative subscriber.
641        :returns: Deferred that fires on the Basic.RecoverOk response
642        :rtype: Deferred
643
644        """
645        return self._wrap_channel_method("basic_recover")(requeue=requeue)
646
647    def close(self, reply_code=0, reply_text="Normal shutdown"):
648        """Invoke a graceful shutdown of the channel with the AMQP Broker.
649
650        If channel is OPENING, transition to CLOSING and suppress the incoming
651        Channel.OpenOk, if any.
652
653        :param int reply_code: The reason code to send to broker
654        :param str reply_text: The reason text to send to broker
655
656        :raises ChannelWrongStateError: if channel is closed or closing
657
658        """
659        return self._channel.close(reply_code=reply_code, reply_text=reply_text)
660
661    def confirm_delivery(self):
662        """Turn on Confirm mode in the channel. Pass in a callback to be
663        notified by the Broker when a message has been confirmed as received or
664        rejected (Basic.Ack, Basic.Nack) from the broker to the publisher.
665
666        For more information see:
667            http://www.rabbitmq.com/confirms.html#publisher-confirms
668
669        :returns: Deferred that fires on the Confirm.SelectOk response
670        :rtype: Deferred
671
672        """
673        if self._delivery_confirmation:
674            LOGGER.error('confirm_delivery: confirmation was already enabled.')
675            return defer.succeed(None)
676        wrapped = self._wrap_channel_method('confirm_delivery')
677        d = wrapped(ack_nack_callback=self._on_delivery_confirmation)
678
679        def set_delivery_confirmation(result):
680            self._delivery_confirmation = True
681            self._delivery_message_id = 0
682            LOGGER.debug("Delivery confirmation enabled.")
683            return result
684
685        d.addCallback(set_delivery_confirmation)
686        # Unroutable messages returned after this point will be in the context
687        # of publisher acknowledgments
688        self._channel.add_on_return_callback(self._on_puback_message_returned)
689        return d
690
691    def _on_delivery_confirmation(self, method_frame):
692        """Invoked by pika when RabbitMQ responds to a Basic.Publish RPC
693        command, passing in either a Basic.Ack or Basic.Nack frame with
694        the delivery tag of the message that was published. The delivery tag
695        is an integer counter indicating the message number that was sent
696        on the channel via Basic.Publish. Here we're just doing house keeping
697        to keep track of stats and remove message numbers that we expect
698        a delivery confirmation of from the list used to keep track of messages
699        that are pending confirmation.
700
701        :param pika.frame.Method method_frame: Basic.Ack or Basic.Nack frame
702
703        """
704        delivery_tag = method_frame.method.delivery_tag
705        if delivery_tag not in self._deliveries:
706            LOGGER.error("Delivery tag %s not found in the pending deliveries",
707                         delivery_tag)
708            return
709        if method_frame.method.multiple:
710            tags = [tag for tag in self._deliveries if tag <= delivery_tag]
711            tags.sort()
712        else:
713            tags = [delivery_tag]
714        for tag in tags:
715            d = self._deliveries[tag]
716            del self._deliveries[tag]
717            if isinstance(method_frame.method, pika.spec.Basic.Nack):
718                # Broker was unable to process the message due to internal
719                # error
720                LOGGER.warning(
721                    "Message was Nack'ed by broker: nack=%r; channel=%s;",
722                    method_frame.method, self.channel_number)
723                if self._puback_return is not None:
724                    returned_messages = [self._puback_return]
725                    self._puback_return = None
726                else:
727                    returned_messages = []
728                d.errback(exceptions.NackError(returned_messages))
729            else:
730                assert isinstance(method_frame.method, pika.spec.Basic.Ack)
731                if self._puback_return is not None:
732                    # Unroutable message was returned
733                    returned_messages = [self._puback_return]
734                    self._puback_return = None
735                    d.errback(exceptions.UnroutableError(returned_messages))
736                else:
737                    d.callback(method_frame.method)
738
739    def _on_puback_message_returned(self, channel, method, properties, body):
740        """Called as the result of Basic.Return from broker in
741        publisher-acknowledgements mode. Saves the info as a ReturnedMessage
742        instance in self._puback_return.
743
744        :param pika.Channel channel: our self._impl channel
745        :param pika.spec.Basic.Return method:
746        :param pika.spec.BasicProperties properties: message properties
747        :param bytes body: returned message body; empty string if no body
748
749        """
750        assert isinstance(method, spec.Basic.Return), method
751        assert isinstance(properties, spec.BasicProperties), properties
752
753        LOGGER.warning(
754            "Published message was returned: _delivery_confirmation=%s; "
755            "channel=%s; method=%r; properties=%r; body_size=%d; "
756            "body_prefix=%.255r", self._delivery_confirmation,
757            channel.channel_number, method, properties,
758            len(body) if body is not None else None, body)
759
760        self._puback_return = ReceivedMessage(channel=self,
761                method=method, properties=properties, body=body)
762
763    def exchange_bind(self, destination, source, routing_key='',
764                      arguments=None):
765        """Bind an exchange to another exchange.
766
767        :param str destination: The destination exchange to bind
768        :param str source: The source exchange to bind to
769        :param str routing_key: The routing key to bind on
770        :param dict arguments: Custom key/value pair arguments for the binding
771        :raises ValueError:
772        :returns: Deferred that fires on the Exchange.BindOk response
773        :rtype: Deferred
774
775        """
776        return self._wrap_channel_method("exchange_bind")(
777            destination=destination,
778            source=source,
779            routing_key=routing_key,
780            arguments=arguments,
781        )
782
783    def exchange_declare(self,
784                         exchange,
785                         exchange_type='direct',
786                         passive=False,
787                         durable=False,
788                         auto_delete=False,
789                         internal=False,
790                         arguments=None):
791        """This method creates an exchange if it does not already exist, and if
792        the exchange exists, verifies that it is of the correct and expected
793        class.
794
795        If passive set, the server will reply with Declare-Ok if the exchange
796        already exists with the same name, and raise an error if not and if the
797        exchange does not already exist, the server MUST raise a channel
798        exception with reply code 404 (not found).
799
800        :param str exchange: The exchange name consists of a non-empty sequence
801            of these characters: letters, digits, hyphen, underscore, period,
802            or colon
803        :param str exchange_type: The exchange type to use
804        :param bool passive: Perform a declare or just check to see if it
805            exists
806        :param bool durable: Survive a reboot of RabbitMQ
807        :param bool auto_delete: Remove when no more queues are bound to it
808        :param bool internal: Can only be published to by other exchanges
809        :param dict arguments: Custom key/value pair arguments for the exchange
810        :returns: Deferred that fires on the Exchange.DeclareOk response
811        :rtype: Deferred
812        :raises ValueError:
813
814        """
815        return self._wrap_channel_method("exchange_declare")(
816            exchange=exchange,
817            exchange_type=exchange_type,
818            passive=passive,
819            durable=durable,
820            auto_delete=auto_delete,
821            internal=internal,
822            arguments=arguments,
823        )
824
825    def exchange_delete(self, exchange=None, if_unused=False):
826        """Delete the exchange.
827
828        :param str exchange: The exchange name
829        :param bool if_unused: only delete if the exchange is unused
830        :returns: Deferred that fires on the Exchange.DeleteOk response
831        :rtype: Deferred
832        :raises ValueError:
833
834        """
835        return self._wrap_channel_method("exchange_delete")(
836            exchange=exchange,
837            if_unused=if_unused,
838        )
839
840    def exchange_unbind(self,
841                        destination=None,
842                        source=None,
843                        routing_key='',
844                        arguments=None):
845        """Unbind an exchange from another exchange.
846
847        :param str destination: The destination exchange to unbind
848        :param str source: The source exchange to unbind from
849        :param str routing_key: The routing key to unbind
850        :param dict arguments: Custom key/value pair arguments for the binding
851        :returns: Deferred that fires on the Exchange.UnbindOk response
852        :rtype: Deferred
853        :raises ValueError:
854
855        """
856        return self._wrap_channel_method("exchange_unbind")(
857            destination=destination,
858            source=source,
859            routing_key=routing_key,
860            arguments=arguments,
861        )
862
863    def flow(self, active):
864        """Turn Channel flow control off and on.
865
866        Returns a Deferred that will fire with a bool indicating the channel
867        flow state. For more information, please reference:
868
869        http://www.rabbitmq.com/amqp-0-9-1-reference.html#channel.flow
870
871        :param bool active: Turn flow on or off
872        :returns: Deferred that fires with the channel flow state
873        :rtype: Deferred
874        :raises ValueError:
875
876        """
877        return self._wrap_channel_method("flow")(active=active)
878
879    def open(self):
880        """Open the channel"""
881        return self._channel.open()
882
883    def queue_bind(self, queue, exchange, routing_key=None, arguments=None):
884        """Bind the queue to the specified exchange
885
886        :param str queue: The queue to bind to the exchange
887        :param str exchange: The source exchange to bind to
888        :param str routing_key: The routing key to bind on
889        :param dict arguments: Custom key/value pair arguments for the binding
890        :returns: Deferred that fires on the Queue.BindOk response
891        :rtype: Deferred
892        :raises ValueError:
893
894        """
895        return self._wrap_channel_method("queue_bind")(
896            queue=queue,
897            exchange=exchange,
898            routing_key=routing_key,
899            arguments=arguments,
900        )
901
902    def queue_declare(self,
903                      queue,
904                      passive=False,
905                      durable=False,
906                      exclusive=False,
907                      auto_delete=False,
908                      arguments=None):
909        """Declare queue, create if needed. This method creates or checks a
910        queue. When creating a new queue the client can specify various
911        properties that control the durability of the queue and its contents,
912        and the level of sharing for the queue.
913
914        Use an empty string as the queue name for the broker to auto-generate
915        one
916
917        :param str queue: The queue name; if empty string, the broker will
918            create a unique queue name
919        :param bool passive: Only check to see if the queue exists
920        :param bool durable: Survive reboots of the broker
921        :param bool exclusive: Only allow access by the current connection
922        :param bool auto_delete: Delete after consumer cancels or disconnects
923        :param dict arguments: Custom key/value arguments for the queue
924        :returns: Deferred that fires on the Queue.DeclareOk response
925        :rtype: Deferred
926        :raises ValueError:
927
928        """
929        return self._wrap_channel_method("queue_declare")(
930            queue=queue,
931            passive=passive,
932            durable=durable,
933            exclusive=exclusive,
934            auto_delete=auto_delete,
935            arguments=arguments,
936        )
937
938    def queue_delete(self, queue, if_unused=False, if_empty=False):
939        """Delete a queue from the broker.
940
941
942        This method wraps :meth:`Channel.queue_delete
943        <pika.channel.Channel.queue_delete>`, and removes the reference to the
944        queue object after it gets deleted on the server.
945
946        :param str queue: The queue to delete
947        :param bool if_unused: only delete if it's unused
948        :param bool if_empty: only delete if the queue is empty
949        :returns: Deferred that fires on the Queue.DeleteOk response
950        :rtype: Deferred
951        :raises ValueError:
952
953        """
954        wrapped = self._wrap_channel_method('queue_delete')
955        d = wrapped(queue=queue, if_unused=if_unused, if_empty=if_empty)
956
957        def _clear_consumer(ret, queue_name):
958            for consumer_tag in list(
959                    self._queue_name_to_consumer_tags.get(queue_name, set())):
960                self._consumers[consumer_tag].close(
961                    exceptions.ConsumerCancelled(
962                        "Queue %s was deleted." % queue_name))
963                del self._consumers[consumer_tag]
964                self._queue_name_to_consumer_tags[queue_name].remove(
965                    consumer_tag)
966            return ret
967
968        return d.addCallback(_clear_consumer, queue)
969
970    def queue_purge(self, queue):
971        """Purge all of the messages from the specified queue
972
973        :param str queue: The queue to purge
974        :returns: Deferred that fires on the Queue.PurgeOk response
975        :rtype: Deferred
976        :raises ValueError:
977
978        """
979        return self._wrap_channel_method("queue_purge")(queue=queue)
980
981    def queue_unbind(self,
982                     queue,
983                     exchange=None,
984                     routing_key=None,
985                     arguments=None):
986        """Unbind a queue from an exchange.
987
988        :param str queue: The queue to unbind from the exchange
989        :param str exchange: The source exchange to bind from
990        :param str routing_key: The routing key to unbind
991        :param dict arguments: Custom key/value pair arguments for the binding
992        :returns: Deferred that fires on the Queue.UnbindOk response
993        :rtype: Deferred
994        :raises ValueError:
995
996        """
997        return self._wrap_channel_method("queue_unbind")(
998            queue=queue,
999            exchange=exchange,
1000            routing_key=routing_key,
1001            arguments=arguments,
1002        )
1003
1004    def tx_commit(self):
1005        """Commit a transaction.
1006
1007        :returns: Deferred that fires on the Tx.CommitOk response
1008        :rtype: Deferred
1009        :raises ValueError:
1010
1011        """
1012        return self._wrap_channel_method("tx_commit")()
1013
1014    def tx_rollback(self):
1015        """Rollback a transaction.
1016
1017        :returns: Deferred that fires on the Tx.RollbackOk response
1018        :rtype: Deferred
1019        :raises ValueError:
1020
1021        """
1022        return self._wrap_channel_method("tx_rollback")()
1023
1024    def tx_select(self):
1025        """Select standard transaction mode. This method sets the channel to use
1026        standard transactions. The client must use this method at least once on
1027        a channel before using the Commit or Rollback methods.
1028
1029        :returns: Deferred that fires on the Tx.SelectOk response
1030        :rtype: Deferred
1031        :raises ValueError:
1032
1033        """
1034        return self._wrap_channel_method("tx_select")()
1035
1036
1037class _TwistedConnectionAdapter(pika.connection.Connection):
1038    """A Twisted-specific implementation of a Pika Connection.
1039
1040    NOTE: since `base_connection.BaseConnection`'s primary responsibility is
1041    management of the transport, we use `pika.connection.Connection` directly
1042    as our base class because this adapter uses a different transport
1043    management strategy.
1044
1045    """
1046
1047    def __init__(self, parameters, on_open_callback, on_open_error_callback,
1048                 on_close_callback, custom_reactor):
1049        super(_TwistedConnectionAdapter, self).__init__(
1050            parameters=parameters,
1051            on_open_callback=on_open_callback,
1052            on_open_error_callback=on_open_error_callback,
1053            on_close_callback=on_close_callback,
1054            internal_connection_workflow=False)
1055
1056        self._reactor = custom_reactor or reactor
1057        self._transport = None  # to be provided by `connection_made()`
1058
1059    def _adapter_call_later(self, delay, callback):
1060        """Implement
1061        :py:meth:`pika.connection.Connection._adapter_call_later()`.
1062
1063        """
1064        check_callback_arg(callback, 'callback')
1065        return _TimerHandle(self._reactor.callLater(delay, callback))
1066
1067    def _adapter_remove_timeout(self, timeout_id):
1068        """Implement
1069        :py:meth:`pika.connection.Connection._adapter_remove_timeout()`.
1070
1071        """
1072        timeout_id.cancel()
1073
1074    def _adapter_add_callback_threadsafe(self, callback):
1075        """Implement
1076        :py:meth:`pika.connection.Connection._adapter_add_callback_threadsafe()`.
1077
1078        """
1079        check_callback_arg(callback, 'callback')
1080        self._reactor.callFromThread(callback)
1081
1082    def _adapter_connect_stream(self):
1083        """Implement pure virtual
1084        :py:ref:meth:`pika.connection.Connection._adapter_connect_stream()`
1085         method.
1086
1087        NOTE: This should not be called due to our initialization of Connection
1088        via `internal_connection_workflow=False`
1089        """
1090        raise NotImplementedError
1091
1092    def _adapter_disconnect_stream(self):
1093        """Implement pure virtual
1094        :py:ref:meth:`pika.connection.Connection._adapter_disconnect_stream()`
1095         method.
1096
1097        """
1098        self._transport.loseConnection()
1099
1100    def _adapter_emit_data(self, data):
1101        """Implement pure virtual
1102        :py:ref:meth:`pika.connection.Connection._adapter_emit_data()` method.
1103
1104        """
1105        self._transport.write(data)
1106
1107    def connection_made(self, transport):
1108        """Introduces transport to protocol after transport is connected.
1109
1110        :param twisted.internet.interfaces.ITransport transport:
1111        :raises Exception: Exception-based exception on error
1112
1113        """
1114        self._transport = transport
1115        # Let connection know that stream is available
1116        self._on_stream_connected()
1117
1118    def connection_lost(self, error):
1119        """Called upon loss or closing of TCP connection.
1120
1121        NOTE: `connection_made()` and `connection_lost()` are each called just
1122        once and in that order. All other callbacks are called between them.
1123
1124        :param Failure: A Twisted Failure instance wrapping an exception.
1125
1126        """
1127        self._transport = None
1128        error = error.value  # drop the Failure wrapper
1129        if isinstance(error, twisted_error.ConnectionDone):
1130            self._error = error
1131            error = None
1132        LOGGER.log(logging.DEBUG if error is None else logging.ERROR,
1133                   'connection_lost: %r', error)
1134        self._on_stream_terminated(error)
1135
1136    def data_received(self, data):
1137        """Called to deliver incoming data from the server to the protocol.
1138
1139        :param data: Non-empty data bytes.
1140        :raises Exception: Exception-based exception on error
1141
1142        """
1143        self._on_data_available(data)
1144
1145
1146class TwistedProtocolConnection(protocol.Protocol):
1147    """A Pika-specific implementation of a Twisted Protocol. Allows using
1148    Twisted's non-blocking connectTCP/connectSSL methods for connecting to the
1149    server.
1150
1151    TwistedProtocolConnection objects have a `ready` instance variable that's a
1152    Deferred which fires when the connection is ready to be used (the initial
1153    AMQP handshaking has been done). You *have* to wait for this Deferred to
1154    fire before requesting a channel.
1155
1156    Once the connection is ready, you will be able to use the `closed` instance
1157    variable: a Deferred which fires when the connection is closed.
1158
1159    Since it's Twisted handling connection establishing it does not accept
1160    connect callbacks, you have to implement that within Twisted. Also remember
1161    that the host, port and ssl values of the connection parameters are ignored
1162    because, yet again, it's Twisted who manages the connection.
1163
1164    """
1165
1166    def __init__(self, parameters=None, custom_reactor=None):
1167        self.ready = defer.Deferred()
1168        self.ready.addCallback(lambda _: self.connectionReady())
1169        self.closed = None
1170        self._impl = _TwistedConnectionAdapter(
1171            parameters=parameters,
1172            on_open_callback=self._on_connection_ready,
1173            on_open_error_callback=self._on_connection_failed,
1174            on_close_callback=self._on_connection_closed,
1175            custom_reactor=custom_reactor,
1176        )
1177
1178    def channel(self, channel_number=None):  # pylint: disable=W0221
1179        """Create a new channel with the next available channel number or pass
1180        in a channel number to use. Must be non-zero if you would like to
1181        specify but it is recommended that you let Pika manage the channel
1182        numbers.
1183
1184        :param int channel_number: The channel number to use, defaults to the
1185                                   next available.
1186        :returns: a Deferred that fires with an instance of a wrapper around
1187            the Pika Channel class.
1188        :rtype: Deferred
1189
1190        """
1191        d = defer.Deferred()
1192        self._impl.channel(channel_number, d.callback)
1193        return d.addCallback(TwistedChannel)
1194
1195    @property
1196    def is_closed(self):
1197        # For compatibility with previous releases.
1198        return self._impl.is_closed
1199
1200    def close(self, reply_code=200, reply_text='Normal shutdown'):
1201        if not self._impl.is_closed:
1202            self._impl.close(reply_code, reply_text)
1203        return self.closed
1204
1205    # IProtocol methods
1206
1207    def dataReceived(self, data):
1208        # Pass the bytes to Pika for parsing
1209        self._impl.data_received(data)
1210
1211    def connectionLost(self, reason=protocol.connectionDone):
1212        self._impl.connection_lost(reason)
1213        # Let the caller know there's been an error
1214        d, self.ready = self.ready, None
1215        if d:
1216            d.errback(reason)
1217
1218    def makeConnection(self, transport):
1219        self._impl.connection_made(transport)
1220        protocol.Protocol.makeConnection(self, transport)
1221
1222    # Our own methods
1223
1224    def connectionReady(self):
1225        """This method will be called when the underlying connection is ready.
1226        """
1227
1228    def _on_connection_ready(self, _connection):
1229        d, self.ready = self.ready, None
1230        if d:
1231            self.closed = defer.Deferred()
1232            d.callback(None)
1233
1234    def _on_connection_failed(self, _connection, _error_message=None):
1235        d, self.ready = self.ready, None
1236        if d:
1237            attempts = self._impl.params.connection_attempts
1238            exc = exceptions.AMQPConnectionError(attempts)
1239            d.errback(exc)
1240
1241    def _on_connection_closed(self, _connection, exception):
1242        d, self.closed = self.closed, None
1243        if d:
1244            if isinstance(exception, Failure):
1245                # Calling `callback` with a Failure instance will trigger the
1246                # errback path.
1247                exception = exception.value
1248            d.callback(exception)
1249
1250
1251class _TimerHandle(nbio_interface.AbstractTimerReference):
1252    """This module's adaptation of `nbio_interface.AbstractTimerReference`.
1253
1254    """
1255
1256    def __init__(self, handle):
1257        """
1258
1259        :param twisted.internet.base.DelayedCall handle:
1260        """
1261        self._handle = handle
1262
1263    def cancel(self):
1264        if self._handle is not None:
1265            try:
1266                self._handle.cancel()
1267            except (twisted_error.AlreadyCalled,
1268                    twisted_error.AlreadyCancelled):
1269                pass
1270
1271            self._handle = None
1272