1"""The blocking connection adapter module implements blocking semantics on top
2of Pika's core AMQP driver. While most of the asynchronous expectations are
3removed when using the blocking connection adapter, it attempts to remain true
4to the asynchronous RPC nature of the AMQP protocol, supporting server sent
5RPC commands.
6
7The user facing classes in the module consist of the
8:py:class:`~pika.adapters.blocking_connection.BlockingConnection`
9and the :class:`~pika.adapters.blocking_connection.BlockingChannel`
10classes.
11
12"""
13# Suppress too-many-lines
14# pylint: disable=C0302
15
16# Disable "access to protected member warnings: this wrapper implementation is
17# a friend of those instances
18# pylint: disable=W0212
19
20from collections import namedtuple, deque
21import contextlib
22import functools
23import logging
24import threading
25
26import pika.compat as compat
27import pika.exceptions as exceptions
28import pika.spec
29import pika.validators as validators
30from pika.adapters.utils import connection_workflow
31
32# NOTE: import SelectConnection after others to avoid circular depenency
33from pika.adapters import select_connection
34
35LOGGER = logging.getLogger(__name__)
36
37
38class _CallbackResult(object):
39    """ CallbackResult is a non-thread-safe implementation for receiving
40    callback results; INTERNAL USE ONLY!
41    """
42    __slots__ = ('_value_class', '_ready', '_values')
43
44    def __init__(self, value_class=None):
45        """
46        :param callable value_class: only needed if the CallbackResult
47                                     instance will be used with
48                                     `set_value_once` and `append_element`.
49                                     *args and **kwargs of the value setter
50                                     methods will be passed to this class.
51
52        """
53        self._value_class = value_class
54        self._ready = None
55        self._values = None
56        self.reset()
57
58    def reset(self):
59        """Reset value, but not _value_class"""
60        self._ready = False
61        self._values = None
62
63    def __bool__(self):
64        """ Called by python runtime to implement truth value testing and the
65        built-in operation bool(); NOTE: python 3.x
66        """
67        return self.is_ready()
68
69    # python 2.x version of __bool__
70    __nonzero__ = __bool__
71
72    def __enter__(self):
73        """ Entry into context manager that automatically resets the object
74        on exit; this usage pattern helps garbage-collection by eliminating
75        potential circular references.
76        """
77        return self
78
79    def __exit__(self, *args, **kwargs):
80        """Reset value"""
81        self.reset()
82
83    def is_ready(self):
84        """
85        :returns: True if the object is in a signaled state
86        :rtype: bool
87        """
88        return self._ready
89
90    @property
91    def ready(self):
92        """True if the object is in a signaled state"""
93        return self._ready
94
95    def signal_once(self, *_args, **_kwargs):
96        """ Set as ready
97
98        :raises AssertionError: if result was already signalled
99        """
100        assert not self._ready, '_CallbackResult was already set'
101        self._ready = True
102
103    def set_value_once(self, *args, **kwargs):
104        """ Set as ready with value; the value may be retrieved via the `value`
105        property getter
106
107        :raises AssertionError: if result was already set
108        """
109        self.signal_once()
110        try:
111            self._values = (self._value_class(*args, **kwargs),)
112        except Exception:
113            LOGGER.error(
114                "set_value_once failed: value_class=%r; args=%r; kwargs=%r",
115                self._value_class, args, kwargs)
116            raise
117
118    def append_element(self, *args, **kwargs):
119        """Append an element to values"""
120        assert not self._ready or isinstance(self._values, list), (
121            '_CallbackResult state is incompatible with append_element: '
122            'ready=%r; values=%r' % (self._ready, self._values))
123
124        try:
125            value = self._value_class(*args, **kwargs)
126        except Exception:
127            LOGGER.error(
128                "append_element failed: value_class=%r; args=%r; kwargs=%r",
129                self._value_class, args, kwargs)
130            raise
131
132        if self._values is None:
133            self._values = [value]
134        else:
135            self._values.append(value)
136
137        self._ready = True
138
139    @property
140    def value(self):
141        """
142        :returns: a reference to the value that was set via `set_value_once`
143        :rtype: object
144        :raises AssertionError: if result was not set or value is incompatible
145                                with `set_value_once`
146        """
147        assert self._ready, '_CallbackResult was not set'
148        assert isinstance(self._values, tuple) and len(self._values) == 1, (
149            '_CallbackResult value is incompatible with set_value_once: %r' %
150            (self._values,))
151
152        return self._values[0]
153
154    @property
155    def elements(self):
156        """
157        :returns: a reference to the list containing one or more elements that
158            were added via `append_element`
159        :rtype: list
160        :raises AssertionError: if result was not set or value is incompatible
161                                with `append_element`
162        """
163        assert self._ready, '_CallbackResult was not set'
164        assert isinstance(self._values, list) and self._values, (
165            '_CallbackResult value is incompatible with append_element: %r' %
166            (self._values,))
167
168        return self._values
169
170
171class _IoloopTimerContext(object):
172    """Context manager for registering and safely unregistering a
173    SelectConnection ioloop-based timer
174    """
175
176    def __init__(self, duration, connection):
177        """
178        :param float duration: non-negative timer duration in seconds
179        :param select_connection.SelectConnection connection:
180        """
181        assert hasattr(connection, '_adapter_call_later'), connection
182        self._duration = duration
183        self._connection = connection
184        self._callback_result = _CallbackResult()
185        self._timer_handle = None
186
187    def __enter__(self):
188        """Register a timer"""
189        self._timer_handle = self._connection._adapter_call_later(
190            self._duration, self._callback_result.signal_once)
191        return self
192
193    def __exit__(self, *_args, **_kwargs):
194        """Unregister timer if it hasn't fired yet"""
195        if not self._callback_result:
196            self._connection._adapter_remove_timeout(self._timer_handle)
197            self._timer_handle = None
198
199    def is_ready(self):
200        """
201        :returns: True if timer has fired, False otherwise
202        :rtype: bool
203        """
204        return self._callback_result.is_ready()
205
206
207class _TimerEvt(object):
208    """Represents a timer created via `BlockingConnection.call_later`"""
209    __slots__ = ('timer_id', '_callback')
210
211    def __init__(self, callback):
212        """
213        :param callback: see callback in `BlockingConnection.call_later`
214        """
215        self._callback = callback
216
217        # Will be set to timer id returned from the underlying implementation's
218        # `_adapter_call_later` method
219        self.timer_id = None
220
221    def __repr__(self):
222        return '<%s timer_id=%s callback=%s>' % (self.__class__.__name__,
223                                                 self.timer_id, self._callback)
224
225    def dispatch(self):
226        """Dispatch the user's callback method"""
227        LOGGER.debug('_TimerEvt.dispatch: invoking callback=%r', self._callback)
228        self._callback()
229
230
231class _ConnectionBlockedUnblockedEvtBase(object):
232    """Base class for `_ConnectionBlockedEvt` and `_ConnectionUnblockedEvt`"""
233    __slots__ = ('_callback', '_method_frame')
234
235    def __init__(self, callback, method_frame):
236        """
237        :param callback: see callback parameter in
238          `BlockingConnection.add_on_connection_blocked_callback` and
239          `BlockingConnection.add_on_connection_unblocked_callback`
240        :param pika.frame.Method method_frame: with method_frame.method of type
241          `pika.spec.Connection.Blocked` or `pika.spec.Connection.Unblocked`
242        """
243        self._callback = callback
244        self._method_frame = method_frame
245
246    def __repr__(self):
247        return '<%s callback=%s, frame=%s>' % (
248            self.__class__.__name__, self._callback, self._method_frame)
249
250    def dispatch(self):
251        """Dispatch the user's callback method"""
252        self._callback(self._method_frame)
253
254
255class _ConnectionBlockedEvt(_ConnectionBlockedUnblockedEvtBase):
256    """Represents a Connection.Blocked notification from RabbitMQ broker`"""
257
258
259class _ConnectionUnblockedEvt(_ConnectionBlockedUnblockedEvtBase):
260    """Represents a Connection.Unblocked notification from RabbitMQ broker`"""
261
262
263class BlockingConnection(object):
264    """The BlockingConnection creates a layer on top of Pika's asynchronous core
265    providing methods that will block until their expected response has
266    returned. Due to the asynchronous nature of the `Basic.Deliver` and
267    `Basic.Return` calls from RabbitMQ to your application, you can still
268    implement continuation-passing style asynchronous methods if you'd like to
269    receive messages from RabbitMQ using
270    :meth:`basic_consume <BlockingChannel.basic_consume>` or if you want to be
271    notified of a delivery failure when using
272    :meth:`basic_publish <BlockingChannel.basic_publish>`.
273
274    For more information about communicating with the blocking_connection
275    adapter, be sure to check out the
276    :class:`BlockingChannel <BlockingChannel>` class which implements the
277    :class:`Channel <pika.channel.Channel>` based communication for the
278    blocking_connection adapter.
279
280    To prevent recursion/reentrancy, the blocking connection and channel
281    implementations queue asynchronously-delivered events received
282    in nested context (e.g., while waiting for `BlockingConnection.channel` or
283    `BlockingChannel.queue_declare` to complete), dispatching them synchronously
284    once nesting returns to the desired context. This concerns all callbacks,
285    such as those registered via `BlockingConnection.call_later`,
286    `BlockingConnection.add_on_connection_blocked_callback`,
287    `BlockingConnection.add_on_connection_unblocked_callback`,
288    `BlockingChannel.basic_consume`, etc.
289
290    Blocked Connection deadlock avoidance: when RabbitMQ becomes low on
291    resources, it emits Connection.Blocked (AMQP extension) to the client
292    connection when client makes a resource-consuming request on that connection
293    or its channel (e.g., `Basic.Publish`); subsequently, RabbitMQ suspsends
294    processing requests from that connection until the affected resources are
295    restored. See http://www.rabbitmq.com/connection-blocked.html. This
296    may impact `BlockingConnection` and `BlockingChannel` operations in a
297    way that users might not be expecting. For example, if the user dispatches
298    `BlockingChannel.basic_publish` in non-publisher-confirmation mode while
299    RabbitMQ is in this low-resource state followed by a synchronous request
300    (e.g., `BlockingConnection.channel`, `BlockingChannel.consume`,
301    `BlockingChannel.basic_consume`, etc.), the synchronous request will block
302    indefinitely (until Connection.Unblocked) waiting for RabbitMQ to reply. If
303    the blocked state persists for a long time, the blocking operation will
304    appear to hang. In this state, `BlockingConnection` instance and its
305    channels will not dispatch user callbacks. SOLUTION: To break this potential
306    deadlock, applications may configure the `blocked_connection_timeout`
307    connection parameter when instantiating `BlockingConnection`. Upon blocked
308    connection timeout, this adapter will raise ConnectionBlockedTimeout
309    exception`. See `pika.connection.ConnectionParameters` documentation to
310    learn more about the `blocked_connection_timeout` configuration.
311
312    """
313    # Connection-closing callback args
314    _OnClosedArgs = namedtuple('BlockingConnection__OnClosedArgs',
315                               'connection error')
316
317    # Channel-opened callback args
318    _OnChannelOpenedArgs = namedtuple('BlockingConnection__OnChannelOpenedArgs',
319                                      'channel')
320
321    def __init__(self, parameters=None, _impl_class=None):
322        """Create a new instance of the Connection object.
323
324        :param None | pika.connection.Parameters | sequence parameters:
325            Connection parameters instance or non-empty sequence of them. If
326            None, a `pika.connection.Parameters` instance will be created with
327            default settings. See `pika.AMQPConnectionWorkflow` for more
328            details about multiple parameter configurations and retries.
329        :param _impl_class: for tests/debugging only; implementation class;
330            None=default
331
332        :raises RuntimeError:
333
334        """
335        # Used for mutual exclusion to avoid race condition between
336        # BlockingConnection._cleanup() and another thread calling
337        # BlockingConnection.add_callback_threadsafe() against a closed
338        # ioloop.
339        self._cleanup_mutex = threading.Lock()
340
341        # Used by the _acquire_event_dispatch decorator; when already greater
342        # than 0, event dispatch is already acquired higher up the call stack
343        self._event_dispatch_suspend_depth = 0
344
345        # Connection-specific events that are ready for dispatch: _TimerEvt,
346        # _ConnectionBlockedEvt, _ConnectionUnblockedEvt
347        self._ready_events = deque()
348
349        # Channel numbers of channels that are requesting a call to their
350        # BlockingChannel._dispatch_events method; See
351        # `_request_channel_dispatch`
352        self._channels_pending_dispatch = set()
353
354        # Receives on_close_callback args from Connection
355        self._closed_result = _CallbackResult(self._OnClosedArgs)
356
357        # Perform connection workflow
358        self._impl = None  # so that attribute is created in case below raises
359        self._impl = self._create_connection(parameters, _impl_class)
360        self._impl.add_on_close_callback(self._closed_result.set_value_once)
361
362    def __repr__(self):
363        return '<%s impl=%r>' % (self.__class__.__name__, self._impl)
364
365    def __enter__(self):
366        # Prepare `with` context
367        return self
368
369    def __exit__(self, exc_type, value, traceback):
370        # Close connection after `with` context
371        if self.is_open:
372            self.close()
373
374    def _cleanup(self):
375        """Clean up members that might inhibit garbage collection
376
377        """
378        with self._cleanup_mutex:
379            if self._impl is not None:
380                self._impl.ioloop.close()
381            self._ready_events.clear()
382            self._closed_result.reset()
383
384    @contextlib.contextmanager
385    def _acquire_event_dispatch(self):
386        """ Context manager that controls access to event dispatcher for
387        preventing reentrancy.
388
389        The "as" value is True if the managed code block owns the event
390        dispatcher and False if caller higher up in the call stack already owns
391        it. Only managed code that gets ownership (got True) is permitted to
392        dispatch
393        """
394        try:
395            # __enter__ part
396            self._event_dispatch_suspend_depth += 1
397            yield self._event_dispatch_suspend_depth == 1
398        finally:
399            # __exit__ part
400            self._event_dispatch_suspend_depth -= 1
401
402    def _create_connection(self, configs, impl_class):
403        """Run connection workflow, blocking until it completes.
404
405        :param None | pika.connection.Parameters | sequence configs: Connection
406            parameters instance or non-empty sequence of them.
407        :param None | SelectConnection impl_class: for tests/debugging only;
408            implementation class;
409
410        :rtype: impl_class
411
412        :raises: exception on failure
413        """
414
415        if configs is None:
416            configs = (pika.connection.Parameters(),)
417
418        if isinstance(configs, pika.connection.Parameters):
419            configs = (configs,)
420
421        if not configs:
422            raise ValueError('Expected a non-empty sequence of connection '
423                             'parameters, but got {!r}.'.format(configs))
424
425        # Connection workflow completion args
426        #   `result` may be an instance of connection on success or exception on
427        #   failure.
428        on_cw_done_result = _CallbackResult(
429            namedtuple('BlockingConnection_OnConnectionWorkflowDoneArgs',
430                       'result'))
431
432        impl_class = impl_class or select_connection.SelectConnection
433
434        ioloop = select_connection.IOLoop()
435
436        ioloop.activate_poller()
437        try:
438            impl_class.create_connection(
439                configs,
440                on_done=on_cw_done_result.set_value_once,
441                custom_ioloop=ioloop)
442
443            while not on_cw_done_result.ready:
444                ioloop.poll()
445                ioloop.process_timeouts()
446
447            if isinstance(on_cw_done_result.value.result, BaseException):
448                error = on_cw_done_result.value.result
449                LOGGER.error('Connection workflow failed: %r', error)
450                raise self._reap_last_connection_workflow_error(error)
451            else:
452                LOGGER.info('Connection workflow succeeded: %r',
453                            on_cw_done_result.value.result)
454                return on_cw_done_result.value.result
455        except Exception:
456            LOGGER.exception('Error in _create_connection().')
457            ioloop.close()
458            self._cleanup()
459            raise
460
461    @staticmethod
462    def _reap_last_connection_workflow_error(error):
463        """Extract exception value from the last connection attempt
464
465        :param Exception error: error passed by the `AMQPConnectionWorkflow`
466            completion callback.
467
468        :returns: Exception value from the last connection attempt
469        :rtype: Exception
470        """
471        if isinstance(error, connection_workflow.AMQPConnectionWorkflowFailed):
472            # Extract exception value from the last connection attempt
473            error = error.exceptions[-1]
474            if isinstance(error,
475                          connection_workflow.AMQPConnectorSocketConnectError):
476                error = exceptions.AMQPConnectionError(error)
477            elif isinstance(error,
478                            connection_workflow.AMQPConnectorPhaseErrorBase):
479                error = error.exception
480
481        return error
482
483    def _flush_output(self, *waiters):
484        """ Flush output and process input while waiting for any of the given
485        callbacks to return true. The wait is aborted upon connection-close.
486        Otherwise, processing continues until the output is flushed AND at least
487        one of the callbacks returns true. If there are no callbacks, then
488        processing ends when all output is flushed.
489
490        :param waiters: sequence of zero or more callables taking no args and
491                        returning true when it's time to stop processing.
492                        Their results are OR'ed together.
493        :raises: exceptions passed by impl if opening of connection fails or
494            connection closes.
495        """
496        if self.is_closed:
497            raise exceptions.ConnectionWrongStateError()
498
499        # Conditions for terminating the processing loop:
500        #   connection closed
501        #         OR
502        #   empty outbound buffer and no waiters
503        #         OR
504        #   empty outbound buffer and any waiter is ready
505        is_done = (lambda:
506                   self._closed_result.ready or
507                   ((not self._impl._transport or
508                     self._impl._get_write_buffer_size() == 0) and
509                    (not waiters or any(ready() for ready in waiters))))
510
511        # Process I/O until our completion condition is satisfied
512        while not is_done():
513            self._impl.ioloop.poll()
514            self._impl.ioloop.process_timeouts()
515
516        if self._closed_result.ready:
517            try:
518                if not isinstance(self._closed_result.value.error,
519                                  exceptions.ConnectionClosedByClient):
520                    LOGGER.error('Unexpected connection close detected: %r',
521                                 self._closed_result.value.error)
522                    raise self._closed_result.value.error
523                else:
524                    LOGGER.info('User-initiated close: result=%r',
525                                self._closed_result.value)
526            finally:
527                self._cleanup()
528
529    def _request_channel_dispatch(self, channel_number):
530        """Called by BlockingChannel instances to request a call to their
531        _dispatch_events method or to terminate `process_data_events`;
532        BlockingConnection will honor these requests from a safe context.
533
534        :param int channel_number: positive channel number to request a call
535            to the channel's `_dispatch_events`; a negative channel number to
536            request termination of `process_data_events`
537        """
538        self._channels_pending_dispatch.add(channel_number)
539
540    def _dispatch_channel_events(self):
541        """Invoke the `_dispatch_events` method on open channels that requested
542        it
543        """
544        if not self._channels_pending_dispatch:
545            return
546
547        with self._acquire_event_dispatch() as dispatch_acquired:
548            if not dispatch_acquired:
549                # Nested dispatch or dispatch blocked higher in call stack
550                return
551
552            candidates = list(self._channels_pending_dispatch)
553            self._channels_pending_dispatch.clear()
554
555            for channel_number in candidates:
556                if channel_number < 0:
557                    # This was meant to terminate process_data_events
558                    continue
559
560                try:
561                    impl_channel = self._impl._channels[channel_number]
562                except KeyError:
563                    continue
564
565                if impl_channel.is_open:
566                    impl_channel._get_cookie()._dispatch_events()
567
568    def _on_timer_ready(self, evt):
569        """Handle expiry of a timer that was registered via
570        `_adapter_call_later()`
571
572        :param _TimerEvt evt:
573
574        """
575        self._ready_events.append(evt)
576
577    def _on_threadsafe_callback(self, user_callback):
578        """Handle callback that was registered via
579        `self._impl._adapter_add_callback_threadsafe`.
580
581        :param user_callback: callback passed to our
582            `add_callback_threadsafe` by the application.
583
584        """
585        # Turn it into a 0-delay timeout to take advantage of our existing logic
586        # that deals with reentrancy
587        self.call_later(0, user_callback)
588
589    def _on_connection_blocked(self, user_callback, _impl, method_frame):
590        """Handle Connection.Blocked notification from RabbitMQ broker
591
592        :param callable user_callback: callback passed to
593           `add_on_connection_blocked_callback`
594        :param select_connection.SelectConnection _impl:
595        :param pika.frame.Method method_frame: method frame having `method`
596            member of type `pika.spec.Connection.Blocked`
597        """
598        self._ready_events.append(
599            _ConnectionBlockedEvt(user_callback, method_frame))
600
601    def _on_connection_unblocked(self, user_callback, _impl, method_frame):
602        """Handle Connection.Unblocked notification from RabbitMQ broker
603
604        :param callable user_callback: callback passed to
605           `add_on_connection_unblocked_callback`
606        :param select_connection.SelectConnection _impl:
607        :param pika.frame.Method method_frame: method frame having `method`
608            member of type `pika.spec.Connection.Blocked`
609        """
610        self._ready_events.append(
611            _ConnectionUnblockedEvt(user_callback, method_frame))
612
613    def _dispatch_connection_events(self):
614        """Dispatch ready connection events"""
615        if not self._ready_events:
616            return
617
618        with self._acquire_event_dispatch() as dispatch_acquired:
619            if not dispatch_acquired:
620                # Nested dispatch or dispatch blocked higher in call stack
621                return
622
623            # Limit dispatch to the number of currently ready events to avoid
624            # getting stuck in this loop
625            for _ in compat.xrange(len(self._ready_events)):
626                try:
627                    evt = self._ready_events.popleft()
628                except IndexError:
629                    # Some events (e.g., timers) must have been cancelled
630                    break
631
632                evt.dispatch()
633
634    def add_on_connection_blocked_callback(self, callback):
635        """RabbitMQ AMQP extension - Add a callback to be notified when the
636        connection gets blocked (`Connection.Blocked` received from RabbitMQ)
637        due to the broker running low on resources (memory or disk). In this
638        state RabbitMQ suspends processing incoming data until the connection
639        is unblocked, so it's a good idea for publishers receiving this
640        notification to suspend publishing until the connection becomes
641        unblocked.
642
643        NOTE: due to the blocking nature of BlockingConnection, if it's sending
644        outbound data while the connection is/becomes blocked, the call may
645        remain blocked until the connection becomes unblocked, if ever. You
646        may use `ConnectionParameters.blocked_connection_timeout` to abort a
647        BlockingConnection method call with an exception when the connection
648        remains blocked longer than the given timeout value.
649
650        See also `Connection.add_on_connection_unblocked_callback()`
651
652        See also `ConnectionParameters.blocked_connection_timeout`.
653
654        :param callable callback: Callback to call on `Connection.Blocked`,
655            having the signature `callback(connection, pika.frame.Method)`,
656            where connection is the `BlockingConnection` instance and the method
657            frame's `method` member is of type `pika.spec.Connection.Blocked`
658
659        """
660        self._impl.add_on_connection_blocked_callback(
661            functools.partial(self._on_connection_blocked,
662                              functools.partial(callback, self)))
663
664    def add_on_connection_unblocked_callback(self, callback):
665        """RabbitMQ AMQP extension - Add a callback to be notified when the
666        connection gets unblocked (`Connection.Unblocked` frame is received from
667        RabbitMQ) letting publishers know it's ok to start publishing again.
668
669        :param callable callback: Callback to call on Connection.Unblocked`,
670            having the signature `callback(connection, pika.frame.Method)`,
671            where connection is the `BlockingConnection` instance and the method
672             frame's `method` member is of type `pika.spec.Connection.Unblocked`
673
674        """
675        self._impl.add_on_connection_unblocked_callback(
676            functools.partial(self._on_connection_unblocked,
677                              functools.partial(callback, self)))
678
679    def call_later(self, delay, callback):
680        """Create a single-shot timer to fire after delay seconds. Do not
681        confuse with Tornado's timeout where you pass in the time you want to
682        have your callback called. Only pass in the seconds until it's to be
683        called.
684
685        NOTE: the timer callbacks are dispatched only in the scope of
686        specially-designated methods: see
687        `BlockingConnection.process_data_events()` and
688        `BlockingChannel.start_consuming()`.
689
690        :param float delay: The number of seconds to wait to call callback
691        :param callable callback: The callback method with the signature
692            callback()
693        :returns: Opaque timer id
694        :rtype: int
695
696        """
697        validators.require_callback(callback)
698
699        evt = _TimerEvt(callback=callback)
700        timer_id = self._impl._adapter_call_later(
701            delay, functools.partial(self._on_timer_ready, evt))
702        evt.timer_id = timer_id
703
704        return timer_id
705
706    def add_callback_threadsafe(self, callback):
707        """Requests a call to the given function as soon as possible in the
708        context of this connection's thread.
709
710        NOTE: This is the only thread-safe method in `BlockingConnection`. All
711        other manipulations of `BlockingConnection` must be performed from the
712        connection's thread.
713
714        NOTE: the callbacks are dispatched only in the scope of
715        specially-designated methods: see
716        `BlockingConnection.process_data_events()` and
717        `BlockingChannel.start_consuming()`.
718
719        For example, a thread may request a call to the
720        `BlockingChannel.basic_ack` method of a `BlockingConnection` that is
721        running in a different thread via
722
723        ```
724        connection.add_callback_threadsafe(
725            functools.partial(channel.basic_ack, delivery_tag=...))
726        ```
727
728        NOTE: if you know that the requester is running on the same thread as
729        the connection it is more efficient to use the
730        `BlockingConnection.call_later()` method with a delay of 0.
731
732        :param callable callback: The callback method; must be callable
733        :raises pika.exceptions.ConnectionWrongStateError: if connection is
734            closed
735        """
736        with self._cleanup_mutex:
737            # NOTE: keep in mind that we may be called from another thread and
738            # this mutex only synchronizes us with our connection cleanup logic,
739            # so a simple check for "is_closed" is pretty much all we're allowed
740            # to do here besides calling the only thread-safe method
741            # _adapter_add_callback_threadsafe().
742            if self.is_closed:
743                raise exceptions.ConnectionWrongStateError(
744                    'BlockingConnection.add_callback_threadsafe() called on '
745                    'closed or closing connection.')
746
747            self._impl._adapter_add_callback_threadsafe(
748                functools.partial(self._on_threadsafe_callback, callback))
749
750    def remove_timeout(self, timeout_id):
751        """Remove a timer if it's still in the timeout stack
752
753        :param timeout_id: The opaque timer id to remove
754
755        """
756        # Remove from the impl's timeout stack
757        self._impl._adapter_remove_timeout(timeout_id)
758
759        # Remove from ready events, if the timer fired already
760        for i, evt in enumerate(self._ready_events):
761            if isinstance(evt, _TimerEvt) and evt.timer_id == timeout_id:
762                index_to_remove = i
763                break
764        else:
765            # Not found
766            return
767
768        del self._ready_events[index_to_remove]
769
770    def close(self, reply_code=200, reply_text='Normal shutdown'):
771        """Disconnect from RabbitMQ. If there are any open channels, it will
772        attempt to close them prior to fully disconnecting. Channels which
773        have active consumers will attempt to send a Basic.Cancel to RabbitMQ
774        to cleanly stop the delivery of messages prior to closing the channel.
775
776        :param int reply_code: The code number for the close
777        :param str reply_text: The text reason for the close
778
779        :raises pika.exceptions.ConnectionWrongStateError: if called on a closed
780            connection (NEW in v1.0.0)
781        """
782        if not self.is_open:
783            msg = '{}.close({}, {!r}) called on closed connection.'.format(
784                self.__class__.__name__, reply_code, reply_text)
785            LOGGER.error(msg)
786            raise exceptions.ConnectionWrongStateError(msg)
787
788        LOGGER.info('Closing connection (%s): %s', reply_code, reply_text)
789
790        # Close channels that remain opened
791        for impl_channel in compat.dictvalues(self._impl._channels):
792            channel = impl_channel._get_cookie()
793            if channel.is_open:
794                try:
795                    channel.close(reply_code, reply_text)
796                except exceptions.ChannelClosed as exc:
797                    # Log and suppress broker-closed channel
798                    LOGGER.warning(
799                        'Got ChannelClosed while closing channel '
800                        'from connection.close: %r', exc)
801
802        # Close the connection
803        self._impl.close(reply_code, reply_text)
804
805        self._flush_output(self._closed_result.is_ready)
806
807    def process_data_events(self, time_limit=0):
808        """Will make sure that data events are processed. Dispatches timer and
809        channel callbacks if not called from the scope of BlockingConnection or
810        BlockingChannel callback. Your app can block on this method.
811
812        :param float time_limit: suggested upper bound on processing time in
813            seconds. The actual blocking time depends on the granularity of the
814            underlying ioloop. Zero means return as soon as possible. None means
815            there is no limit on processing time and the function will block
816            until I/O produces actionable events. Defaults to 0 for backward
817            compatibility. This parameter is NEW in pika 0.10.0.
818        """
819        with self._acquire_event_dispatch() as dispatch_acquired:
820            # Check if we can actually process pending events
821            common_terminator = lambda: bool(dispatch_acquired and
822                                             (self._channels_pending_dispatch or
823                                              self._ready_events))
824            if time_limit is None:
825                self._flush_output(common_terminator)
826            else:
827                with _IoloopTimerContext(time_limit, self._impl) as timer:
828                    self._flush_output(timer.is_ready, common_terminator)
829
830        if self._ready_events:
831            self._dispatch_connection_events()
832
833        if self._channels_pending_dispatch:
834            self._dispatch_channel_events()
835
836    def sleep(self, duration):
837        """A safer way to sleep than calling time.sleep() directly that would
838        keep the adapter from ignoring frames sent from the broker. The
839        connection will "sleep" or block the number of seconds specified in
840        duration in small intervals.
841
842        :param float duration: The time to sleep in seconds
843
844        """
845        assert duration >= 0, duration
846
847        deadline = compat.time_now() + duration
848        time_limit = duration
849        # Process events at least once
850        while True:
851            self.process_data_events(time_limit)
852            time_limit = deadline - compat.time_now()
853            if time_limit <= 0:
854                break
855
856    def channel(self, channel_number=None):
857        """Create a new channel with the next available channel number or pass
858        in a channel number to use. Must be non-zero if you would like to
859        specify but it is recommended that you let Pika manage the channel
860        numbers.
861
862        :rtype: pika.adapters.blocking_connection.BlockingChannel
863        """
864        with _CallbackResult(self._OnChannelOpenedArgs) as opened_args:
865            impl_channel = self._impl.channel(
866                channel_number=channel_number,
867                on_open_callback=opened_args.set_value_once)
868
869            # Create our proxy channel
870            channel = BlockingChannel(impl_channel, self)
871
872            # Link implementation channel with our proxy channel
873            impl_channel._set_cookie(channel)
874
875            # Drive I/O until Channel.Open-ok
876            channel._flush_output(opened_args.is_ready)
877
878        return channel
879
880    #
881    # Connections state properties
882    #
883
884    @property
885    def is_closed(self):
886        """
887        Returns a boolean reporting the current connection state.
888        """
889        return self._impl.is_closed
890
891    @property
892    def is_open(self):
893        """
894        Returns a boolean reporting the current connection state.
895        """
896        return self._impl.is_open
897
898    #
899    # Properties that reflect server capabilities for the current connection
900    #
901
902    @property
903    def basic_nack_supported(self):
904        """Specifies if the server supports basic.nack on the active connection.
905
906        :rtype: bool
907
908        """
909        return self._impl.basic_nack
910
911    @property
912    def consumer_cancel_notify_supported(self):
913        """Specifies if the server supports consumer cancel notification on the
914        active connection.
915
916        :rtype: bool
917
918        """
919        return self._impl.consumer_cancel_notify
920
921    @property
922    def exchange_exchange_bindings_supported(self):
923        """Specifies if the active connection supports exchange to exchange
924        bindings.
925
926        :rtype: bool
927
928        """
929        return self._impl.exchange_exchange_bindings
930
931    @property
932    def publisher_confirms_supported(self):
933        """Specifies if the active connection can use publisher confirmations.
934
935        :rtype: bool
936
937        """
938        return self._impl.publisher_confirms
939
940    # Legacy property names for backward compatibility
941    basic_nack = basic_nack_supported
942    consumer_cancel_notify = consumer_cancel_notify_supported
943    exchange_exchange_bindings = exchange_exchange_bindings_supported
944    publisher_confirms = publisher_confirms_supported
945
946
947class _ChannelPendingEvt(object):
948    """Base class for BlockingChannel pending events"""
949
950
951class _ConsumerDeliveryEvt(_ChannelPendingEvt):
952    """This event represents consumer message delivery `Basic.Deliver`; it
953    contains method, properties, and body of the delivered message.
954    """
955
956    __slots__ = ('method', 'properties', 'body')
957
958    def __init__(self, method, properties, body):
959        """
960        :param spec.Basic.Deliver method: NOTE: consumer_tag and delivery_tag
961          are valid only within source channel
962        :param spec.BasicProperties properties: message properties
963        :param bytes body: message body; empty string if no body
964        """
965        self.method = method
966        self.properties = properties
967        self.body = body
968
969
970class _ConsumerCancellationEvt(_ChannelPendingEvt):
971    """This event represents server-initiated consumer cancellation delivered to
972    client via Basic.Cancel. After receiving Basic.Cancel, there will be no
973    further deliveries for the consumer identified by `consumer_tag` in
974    `Basic.Cancel`
975    """
976
977    __slots__ = ('method_frame',)
978
979    def __init__(self, method_frame):
980        """
981        :param pika.frame.Method method_frame: method frame with method of type
982            `spec.Basic.Cancel`
983        """
984        self.method_frame = method_frame
985
986    def __repr__(self):
987        return '<%s method_frame=%r>' % (self.__class__.__name__,
988                                         self.method_frame)
989
990    @property
991    def method(self):
992        """method of type spec.Basic.Cancel"""
993        return self.method_frame.method
994
995
996class _ReturnedMessageEvt(_ChannelPendingEvt):
997    """This event represents a message returned by broker via `Basic.Return`"""
998
999    __slots__ = ('callback', 'channel', 'method', 'properties', 'body')
1000
1001    def __init__(self, callback, channel, method, properties, body):
1002        """
1003        :param callable callback: user's callback, having the signature
1004            callback(channel, method, properties, body), where
1005                                channel: pika.Channel
1006                                method: pika.spec.Basic.Return
1007                                properties: pika.spec.BasicProperties
1008                                body: bytes
1009        :param pika.Channel channel:
1010        :param pika.spec.Basic.Return method:
1011        :param pika.spec.BasicProperties properties:
1012        :param bytes body:
1013        """
1014        self.callback = callback
1015        self.channel = channel
1016        self.method = method
1017        self.properties = properties
1018        self.body = body
1019
1020    def __repr__(self):
1021        return ('<%s callback=%r channel=%r method=%r properties=%r '
1022                'body=%.300r>') % (self.__class__.__name__, self.callback,
1023                                   self.channel, self.method, self.properties,
1024                                   self.body)
1025
1026    def dispatch(self):
1027        """Dispatch user's callback"""
1028        self.callback(self.channel, self.method, self.properties, self.body)
1029
1030
1031class ReturnedMessage(object):
1032    """Represents a message returned via Basic.Return in publish-acknowledgments
1033    mode
1034    """
1035
1036    __slots__ = ('method', 'properties', 'body')
1037
1038    def __init__(self, method, properties, body):
1039        """
1040        :param spec.Basic.Return method:
1041        :param spec.BasicProperties properties: message properties
1042        :param bytes body: message body; empty string if no body
1043        """
1044        self.method = method
1045        self.properties = properties
1046        self.body = body
1047
1048
1049class _ConsumerInfo(object):
1050    """Information about an active consumer"""
1051
1052    __slots__ = ('consumer_tag', 'auto_ack', 'on_message_callback',
1053                 'alternate_event_sink', 'state')
1054
1055    # Consumer states
1056    SETTING_UP = 1
1057    ACTIVE = 2
1058    TEARING_DOWN = 3
1059    CANCELLED_BY_BROKER = 4
1060
1061    def __init__(self,
1062                 consumer_tag,
1063                 auto_ack,
1064                 on_message_callback=None,
1065                 alternate_event_sink=None):
1066        """
1067        NOTE: exactly one of callback/alternate_event_sink musts be non-None.
1068
1069        :param str consumer_tag:
1070        :param bool auto_ack: the no-ack value for the consumer
1071        :param callable on_message_callback: The function for dispatching messages to
1072            user, having the signature:
1073            on_message_callback(channel, method, properties, body)
1074                channel: BlockingChannel
1075                method: spec.Basic.Deliver
1076                properties: spec.BasicProperties
1077                body: bytes
1078        :param callable alternate_event_sink: if specified, _ConsumerDeliveryEvt
1079            and _ConsumerCancellationEvt objects will be diverted to this
1080            callback instead of being deposited in the channel's
1081            `_pending_events` container. Signature:
1082            alternate_event_sink(evt)
1083        """
1084        assert (on_message_callback is None) != (
1085            alternate_event_sink is None
1086        ), ('exactly one of on_message_callback/alternate_event_sink must be non-None',
1087            on_message_callback, alternate_event_sink)
1088        self.consumer_tag = consumer_tag
1089        self.auto_ack = auto_ack
1090        self.on_message_callback = on_message_callback
1091        self.alternate_event_sink = alternate_event_sink
1092        self.state = self.SETTING_UP
1093
1094    @property
1095    def setting_up(self):
1096        """True if in SETTING_UP state"""
1097        return self.state == self.SETTING_UP
1098
1099    @property
1100    def active(self):
1101        """True if in ACTIVE state"""
1102        return self.state == self.ACTIVE
1103
1104    @property
1105    def tearing_down(self):
1106        """True if in TEARING_DOWN state"""
1107        return self.state == self.TEARING_DOWN
1108
1109    @property
1110    def cancelled_by_broker(self):
1111        """True if in CANCELLED_BY_BROKER state"""
1112        return self.state == self.CANCELLED_BY_BROKER
1113
1114
1115class _QueueConsumerGeneratorInfo(object):
1116    """Container for information about the active queue consumer generator """
1117    __slots__ = ('params', 'consumer_tag', 'pending_events')
1118
1119    def __init__(self, params, consumer_tag):
1120        """
1121        :params tuple params: a three-tuple (queue, auto_ack, exclusive) that were
1122           used to create the queue consumer
1123        :param str consumer_tag: consumer tag
1124        """
1125        self.params = params
1126        self.consumer_tag = consumer_tag
1127        #self.messages = deque()
1128
1129        # Holds pending events of types _ConsumerDeliveryEvt and
1130        # _ConsumerCancellationEvt
1131        self.pending_events = deque()
1132
1133    def __repr__(self):
1134        return '<%s params=%r consumer_tag=%r>' % (
1135            self.__class__.__name__, self.params, self.consumer_tag)
1136
1137
1138class BlockingChannel(object):
1139    """The BlockingChannel implements blocking semantics for most things that
1140    one would use callback-passing-style for with the
1141    :py:class:`~pika.channel.Channel` class. In addition,
1142    the `BlockingChannel` class implements a :term:`generator` that allows
1143    you to :doc:`consume messages </examples/blocking_consumer_generator>`
1144    without using callbacks.
1145
1146    Example of creating a BlockingChannel::
1147
1148        import pika
1149
1150        # Create our connection object
1151        connection = pika.BlockingConnection()
1152
1153        # The returned object will be a synchronous channel
1154        channel = connection.channel()
1155
1156    """
1157
1158    # Used as value_class with _CallbackResult for receiving Basic.GetOk args
1159    _RxMessageArgs = namedtuple(
1160        'BlockingChannel__RxMessageArgs',
1161        [
1162            'channel',  # implementation pika.Channel instance
1163            'method',  # Basic.GetOk
1164            'properties',  # pika.spec.BasicProperties
1165            'body'  # str, unicode, or bytes (python 3.x)
1166        ])
1167
1168    # For use as value_class with any _CallbackResult that expects method_frame
1169    # as the only arg
1170    _MethodFrameCallbackResultArgs = namedtuple(
1171        'BlockingChannel__MethodFrameCallbackResultArgs', 'method_frame')
1172
1173    # Broker's basic-ack/basic-nack args when delivery confirmation is enabled;
1174    # may concern a single or multiple messages
1175    _OnMessageConfirmationReportArgs = namedtuple(
1176        'BlockingChannel__OnMessageConfirmationReportArgs', 'method_frame')
1177
1178    # For use as value_class with _CallbackResult expecting Channel.Flow
1179    # confirmation.
1180    _FlowOkCallbackResultArgs = namedtuple(
1181        'BlockingChannel__FlowOkCallbackResultArgs',
1182        'active'  # True if broker will start or continue sending; False if not
1183    )
1184
1185    _CONSUMER_CANCELLED_CB_KEY = 'blocking_channel_consumer_cancelled'
1186
1187    def __init__(self, channel_impl, connection):
1188        """Create a new instance of the Channel
1189
1190        :param pika.channel.Channel channel_impl: Channel implementation object
1191            as returned from SelectConnection.channel()
1192        :param BlockingConnection connection: The connection object
1193
1194        """
1195        self._impl = channel_impl
1196        self._connection = connection
1197
1198        # A mapping of consumer tags to _ConsumerInfo for active consumers
1199        self._consumer_infos = dict()
1200
1201        # Queue consumer generator generator info of type
1202        # _QueueConsumerGeneratorInfo created by BlockingChannel.consume
1203        self._queue_consumer_generator = None
1204
1205        # Whether RabbitMQ delivery confirmation has been enabled
1206        self._delivery_confirmation = False
1207
1208        # Receives message delivery confirmation report (Basic.ack or
1209        # Basic.nack) from broker when delivery confirmations are enabled
1210        self._message_confirmation_result = _CallbackResult(
1211            self._OnMessageConfirmationReportArgs)
1212
1213        # deque of pending events: _ConsumerDeliveryEvt and
1214        # _ConsumerCancellationEvt objects that will be returned by
1215        # `BlockingChannel.get_event()`
1216        self._pending_events = deque()
1217
1218        # Holds a ReturnedMessage object representing a message received via
1219        # Basic.Return in publisher-acknowledgments mode.
1220        self._puback_return = None
1221
1222        # self._on_channel_closed() saves the reason exception here
1223        self._closing_reason = None  # type: None | Exception
1224
1225        # Receives Basic.ConsumeOk reply from server
1226        self._basic_consume_ok_result = _CallbackResult()
1227
1228        # Receives args from Basic.GetEmpty response
1229        #  http://www.rabbitmq.com/amqp-0-9-1-reference.html#basic.get
1230        self._basic_getempty_result = _CallbackResult(
1231            self._MethodFrameCallbackResultArgs)
1232
1233        self._impl.add_on_cancel_callback(self._on_consumer_cancelled_by_broker)
1234
1235        self._impl.add_callback(
1236            self._basic_consume_ok_result.signal_once,
1237            replies=[pika.spec.Basic.ConsumeOk],
1238            one_shot=False)
1239
1240        self._impl.add_on_close_callback(self._on_channel_closed)
1241
1242        self._impl.add_callback(
1243            self._basic_getempty_result.set_value_once,
1244            replies=[pika.spec.Basic.GetEmpty],
1245            one_shot=False)
1246
1247        LOGGER.info("Created channel=%s", self.channel_number)
1248
1249    def __int__(self):
1250        """Return the channel object as its channel number
1251
1252        NOTE: inherited from legacy BlockingConnection; might be error-prone;
1253        use `channel_number` property instead.
1254
1255        :rtype: int
1256
1257        """
1258        return self.channel_number
1259
1260    def __repr__(self):
1261        return '<%s impl=%r>' % (self.__class__.__name__, self._impl)
1262
1263    def __enter__(self):
1264        return self
1265
1266    def __exit__(self, exc_type, value, traceback):
1267        if self.is_open:
1268            self.close()
1269
1270    def _cleanup(self):
1271        """Clean up members that might inhibit garbage collection"""
1272        self._message_confirmation_result.reset()
1273        self._pending_events = deque()
1274        self._consumer_infos = dict()
1275        self._queue_consumer_generator = None
1276
1277    @property
1278    def channel_number(self):
1279        """Channel number"""
1280        return self._impl.channel_number
1281
1282    @property
1283    def connection(self):
1284        """The channel's BlockingConnection instance"""
1285        return self._connection
1286
1287    @property
1288    def is_closed(self):
1289        """Returns True if the channel is closed.
1290
1291        :rtype: bool
1292
1293        """
1294        return self._impl.is_closed
1295
1296    @property
1297    def is_open(self):
1298        """Returns True if the channel is open.
1299
1300        :rtype: bool
1301
1302        """
1303        return self._impl.is_open
1304
1305    @property
1306    def consumer_tags(self):
1307        """Property method that returns a list of consumer tags for active
1308        consumers
1309
1310        :rtype: list
1311
1312        """
1313        return compat.dictkeys(self._consumer_infos)
1314
1315    _ALWAYS_READY_WAITERS = ((lambda: True),)
1316
1317    def _flush_output(self, *waiters):
1318        """ Flush output and process input while waiting for any of the given
1319        callbacks to return true. The wait is aborted upon channel-close or
1320        connection-close.
1321        Otherwise, processing continues until the output is flushed AND at least
1322        one of the callbacks returns true. If there are no callbacks, then
1323        processing ends when all output is flushed.
1324
1325        :param waiters: sequence of zero or more callables taking no args and
1326                        returning true when it's time to stop processing.
1327                        Their results are OR'ed together. An empty sequence is
1328                        treated as equivalent to a waiter always returning true.
1329        """
1330        if self.is_closed:
1331            self._impl._raise_if_not_open()
1332
1333        if not waiters:
1334            waiters = self._ALWAYS_READY_WAITERS
1335
1336        self._connection._flush_output(lambda: self.is_closed, *waiters)
1337
1338        if self.is_closed and isinstance(self._closing_reason,
1339                                         exceptions.ChannelClosedByBroker):
1340            raise self._closing_reason  # pylint: disable=E0702
1341
1342    def _on_puback_message_returned(self, channel, method, properties, body):
1343        """Called as the result of Basic.Return from broker in
1344        publisher-acknowledgements mode. Saves the info as a ReturnedMessage
1345        instance in self._puback_return.
1346
1347        :param pika.Channel channel: our self._impl channel
1348        :param pika.spec.Basic.Return method:
1349        :param pika.spec.BasicProperties properties: message properties
1350        :param bytes body: returned message body; empty string if no body
1351        """
1352        assert channel is self._impl, (channel.channel_number,
1353                                       self.channel_number)
1354
1355        assert isinstance(method, pika.spec.Basic.Return), method
1356        assert isinstance(properties, pika.spec.BasicProperties), (properties)
1357
1358        LOGGER.warning(
1359            "Published message was returned: _delivery_confirmation=%s; "
1360            "channel=%s; method=%r; properties=%r; body_size=%d; "
1361            "body_prefix=%.255r", self._delivery_confirmation,
1362            channel.channel_number, method, properties,
1363            len(body) if body is not None else None, body)
1364
1365        self._puback_return = ReturnedMessage(method, properties, body)
1366
1367    def _add_pending_event(self, evt):
1368        """Append an event to the channel's list of events that are ready for
1369        dispatch to user and signal our connection that this channel is ready
1370        for event dispatch
1371
1372        :param _ChannelPendingEvt evt: an event derived from _ChannelPendingEvt
1373        """
1374        self._pending_events.append(evt)
1375        self.connection._request_channel_dispatch(self.channel_number)
1376
1377    def _on_channel_closed(self, _channel, reason):
1378        """Callback from impl notifying us that the channel has been closed.
1379        This may be as the result of user-, broker-, or internal connection
1380        clean-up initiated closing or meta-closing of the channel.
1381
1382        If it resulted from receiving `Channel.Close` from broker, we will
1383        expedite waking up of the event subsystem so that it may respond by
1384        raising `ChannelClosed` from user's context.
1385
1386        NOTE: We can't raise exceptions in callbacks in order to protect
1387        the integrity of the underlying implementation. BlockingConnection's
1388        underlying asynchronous connection adapter (SelectConnection) uses
1389        callbacks to communicate with us. If BlockingConnection leaks exceptions
1390        back into the I/O loop or the asynchronous connection adapter, we
1391        interrupt their normal workflow and introduce a high likelihood of state
1392        inconsistency.
1393
1394        See `pika.Channel.add_on_close_callback()` for additional documentation.
1395
1396        :param pika.Channel _channel: (unused)
1397        :param Exception reason:
1398
1399        """
1400        LOGGER.debug('_on_channel_closed: %r; %r', reason, self)
1401
1402        self._closing_reason = reason
1403
1404        if isinstance(reason, exceptions.ChannelClosedByBroker):
1405            self._cleanup()
1406
1407            # Request urgent termination of `process_data_events()`, in case
1408            # it's executing or next time it will execute
1409            self.connection._request_channel_dispatch(-self.channel_number)
1410
1411    def _on_consumer_cancelled_by_broker(self, method_frame):
1412        """Called by impl when broker cancels consumer via Basic.Cancel.
1413
1414        This is a RabbitMQ-specific feature. The circumstances include deletion
1415        of queue being consumed as well as failure of a HA node responsible for
1416        the queue being consumed.
1417
1418        :param pika.frame.Method method_frame: method frame with the
1419            `spec.Basic.Cancel` method
1420
1421        """
1422        evt = _ConsumerCancellationEvt(method_frame)
1423
1424        consumer = self._consumer_infos[method_frame.method.consumer_tag]
1425
1426        # Don't interfere with client-initiated cancellation flow
1427        if not consumer.tearing_down:
1428            consumer.state = _ConsumerInfo.CANCELLED_BY_BROKER
1429
1430        if consumer.alternate_event_sink is not None:
1431            consumer.alternate_event_sink(evt)
1432        else:
1433            self._add_pending_event(evt)
1434
1435    def _on_consumer_message_delivery(self, _channel, method, properties, body):
1436        """Called by impl when a message is delivered for a consumer
1437
1438        :param Channel channel: The implementation channel object
1439        :param spec.Basic.Deliver method:
1440        :param pika.spec.BasicProperties properties: message properties
1441        :param bytes body: delivered message body; empty string if no body
1442        """
1443        evt = _ConsumerDeliveryEvt(method, properties, body)
1444
1445        consumer = self._consumer_infos[method.consumer_tag]
1446
1447        if consumer.alternate_event_sink is not None:
1448            consumer.alternate_event_sink(evt)
1449        else:
1450            self._add_pending_event(evt)
1451
1452    def _on_consumer_generator_event(self, evt):
1453        """Sink for the queue consumer generator's consumer events; append the
1454        event to queue consumer generator's pending events buffer.
1455
1456        :param evt: an object of type _ConsumerDeliveryEvt or
1457          _ConsumerCancellationEvt
1458        """
1459        self._queue_consumer_generator.pending_events.append(evt)
1460        # Schedule termination of connection.process_data_events using a
1461        # negative channel number
1462        self.connection._request_channel_dispatch(-self.channel_number)
1463
1464    def _cancel_all_consumers(self):
1465        """Cancel all consumers.
1466
1467        NOTE: pending non-ackable messages will be lost; pending ackable
1468        messages will be rejected.
1469
1470        """
1471        if self._consumer_infos:
1472            LOGGER.debug('Cancelling %i consumers', len(self._consumer_infos))
1473
1474            if self._queue_consumer_generator is not None:
1475                # Cancel queue consumer generator
1476                self.cancel()
1477
1478            # Cancel consumers created via basic_consume
1479            for consumer_tag in compat.dictkeys(self._consumer_infos):
1480                self.basic_cancel(consumer_tag)
1481
1482    def _dispatch_events(self):
1483        """Called by BlockingConnection to dispatch pending events.
1484
1485        `BlockingChannel` schedules this callback via
1486        `BlockingConnection._request_channel_dispatch`
1487        """
1488        while self._pending_events:
1489            evt = self._pending_events.popleft()
1490
1491            if type(evt) is _ConsumerDeliveryEvt:  # pylint: disable=C0123
1492                consumer_info = self._consumer_infos[evt.method.consumer_tag]
1493                consumer_info.on_message_callback(self, evt.method,
1494                                                  evt.properties, evt.body)
1495
1496            elif type(evt) is _ConsumerCancellationEvt:  # pylint: disable=C0123
1497                del self._consumer_infos[evt.method_frame.method.consumer_tag]
1498
1499                self._impl.callbacks.process(self.channel_number,
1500                                             self._CONSUMER_CANCELLED_CB_KEY,
1501                                             self, evt.method_frame)
1502            else:
1503                evt.dispatch()
1504
1505    def close(self, reply_code=0, reply_text="Normal shutdown"):
1506        """Will invoke a clean shutdown of the channel with the AMQP Broker.
1507
1508        :param int reply_code: The reply code to close the channel with
1509        :param str reply_text: The reply text to close the channel with
1510
1511        """
1512        LOGGER.debug('Channel.close(%s, %s)', reply_code, reply_text)
1513
1514        self._impl._raise_if_not_open()
1515
1516        try:
1517            # Cancel remaining consumers
1518            self._cancel_all_consumers()
1519
1520            # Close the channel
1521            self._impl.close(reply_code=reply_code, reply_text=reply_text)
1522            self._flush_output(lambda: self.is_closed)
1523        finally:
1524            self._cleanup()
1525
1526    def flow(self, active):
1527        """Turn Channel flow control off and on.
1528
1529        NOTE: RabbitMQ doesn't support active=False; per
1530        https://www.rabbitmq.com/specification.html: "active=false is not
1531        supported by the server. Limiting prefetch with basic.qos provides much
1532        better control"
1533
1534        For more information, please reference:
1535
1536        http://www.rabbitmq.com/amqp-0-9-1-reference.html#channel.flow
1537
1538        :param bool active: Turn flow on (True) or off (False)
1539        :returns: True if broker will start or continue sending; False if not
1540        :rtype: bool
1541
1542        """
1543        with _CallbackResult(self._FlowOkCallbackResultArgs) as flow_ok_result:
1544            self._impl.flow(
1545                active=active, callback=flow_ok_result.set_value_once)
1546            self._flush_output(flow_ok_result.is_ready)
1547            return flow_ok_result.value.active
1548
1549    def add_on_cancel_callback(self, callback):
1550        """Pass a callback function that will be called when Basic.Cancel
1551        is sent by the broker. The callback function should receive a method
1552        frame parameter.
1553
1554        :param callable callback: a callable for handling broker's Basic.Cancel
1555            notification with the call signature: callback(method_frame)
1556            where method_frame is of type `pika.frame.Method` with method of
1557            type `spec.Basic.Cancel`
1558
1559        """
1560        self._impl.callbacks.add(
1561            self.channel_number,
1562            self._CONSUMER_CANCELLED_CB_KEY,
1563            callback,
1564            one_shot=False)
1565
1566    def add_on_return_callback(self, callback):
1567        """Pass a callback function that will be called when a published
1568        message is rejected and returned by the server via `Basic.Return`.
1569
1570        :param callable callback: The method to call on callback with the
1571            signature callback(channel, method, properties, body), where
1572            channel: pika.Channel
1573            method: pika.spec.Basic.Return
1574            properties: pika.spec.BasicProperties
1575            body: bytes
1576
1577        """
1578        self._impl.add_on_return_callback(
1579            lambda _channel, method, properties, body: (
1580                self._add_pending_event(
1581                    _ReturnedMessageEvt(
1582                        callback, self, method, properties, body))))
1583
1584    def basic_consume(self,
1585                      queue,
1586                      on_message_callback,
1587                      auto_ack=False,
1588                      exclusive=False,
1589                      consumer_tag=None,
1590                      arguments=None):
1591        """Sends the AMQP command Basic.Consume to the broker and binds messages
1592        for the consumer_tag to the consumer callback. If you do not pass in
1593        a consumer_tag, one will be automatically generated for you. Returns
1594        the consumer tag.
1595
1596        NOTE: the consumer callbacks are dispatched only in the scope of
1597        specially-designated methods: see
1598        `BlockingConnection.process_data_events` and
1599        `BlockingChannel.start_consuming`.
1600
1601        For more information about Basic.Consume, see:
1602        http://www.rabbitmq.com/amqp-0-9-1-reference.html#basic.consume
1603
1604        :param str queue: The queue from which to consume
1605        :param callable on_message_callback: Required function for dispatching messages
1606            to user, having the signature:
1607            on_message_callback(channel, method, properties, body)
1608                channel: BlockingChannel
1609                method: spec.Basic.Deliver
1610                properties: spec.BasicProperties
1611                body: bytes
1612        :param bool auto_ack: if set to True, automatic acknowledgement mode will be used
1613                              (see http://www.rabbitmq.com/confirms.html). This corresponds
1614                              with the 'no_ack' parameter in the basic.consume AMQP 0.9.1
1615                              method
1616        :param bool exclusive: Don't allow other consumers on the queue
1617        :param str consumer_tag: You may specify your own consumer tag; if left
1618          empty, a consumer tag will be generated automatically
1619        :param dict arguments: Custom key/value pair arguments for the consumer
1620        :returns: consumer tag
1621        :rtype: str
1622        :raises pika.exceptions.DuplicateConsumerTag: if consumer with given
1623            consumer_tag is already present.
1624
1625        """
1626        validators.require_string(queue, 'queue')
1627        validators.require_callback(on_message_callback, 'on_message_callback')
1628        return self._basic_consume_impl(
1629            queue=queue,
1630            on_message_callback=on_message_callback,
1631            auto_ack=auto_ack,
1632            exclusive=exclusive,
1633            consumer_tag=consumer_tag,
1634            arguments=arguments)
1635
1636    def _basic_consume_impl(self,
1637                            queue,
1638                            auto_ack,
1639                            exclusive,
1640                            consumer_tag,
1641                            arguments=None,
1642                            on_message_callback=None,
1643                            alternate_event_sink=None):
1644        """The low-level implementation used by `basic_consume` and `consume`.
1645        See `basic_consume` docstring for more info.
1646
1647        NOTE: exactly one of on_message_callback/alternate_event_sink musts be
1648        non-None.
1649
1650        This method has one additional parameter alternate_event_sink over the
1651        args described in `basic_consume`.
1652
1653        :param callable alternate_event_sink: if specified, _ConsumerDeliveryEvt
1654            and _ConsumerCancellationEvt objects will be diverted to this
1655            callback instead of being deposited in the channel's
1656            `_pending_events` container. Signature:
1657            alternate_event_sink(evt)
1658
1659        :raises pika.exceptions.DuplicateConsumerTag: if consumer with given
1660            consumer_tag is already present.
1661
1662        """
1663        if (on_message_callback is None) == (alternate_event_sink is None):
1664            raise ValueError(
1665                ('exactly one of on_message_callback/alternate_event_sink must '
1666                 'be non-None', on_message_callback, alternate_event_sink))
1667
1668        if not consumer_tag:
1669            # Need a consumer tag to register consumer info before sending
1670            # request to broker, because I/O might dispatch incoming messages
1671            # immediately following Basic.Consume-ok before _flush_output
1672            # returns
1673            consumer_tag = self._impl._generate_consumer_tag()
1674
1675        if consumer_tag in self._consumer_infos:
1676            raise exceptions.DuplicateConsumerTag(consumer_tag)
1677
1678        # Create new consumer
1679        self._consumer_infos[consumer_tag] = _ConsumerInfo(
1680            consumer_tag,
1681            auto_ack=auto_ack,
1682            on_message_callback=on_message_callback,
1683            alternate_event_sink=alternate_event_sink)
1684
1685        try:
1686            with self._basic_consume_ok_result as ok_result:
1687                tag = self._impl.basic_consume(
1688                    on_message_callback=self._on_consumer_message_delivery,
1689                    queue=queue,
1690                    auto_ack=auto_ack,
1691                    exclusive=exclusive,
1692                    consumer_tag=consumer_tag,
1693                    arguments=arguments)
1694
1695                assert tag == consumer_tag, (tag, consumer_tag)
1696
1697                self._flush_output(ok_result.is_ready)
1698        except Exception:
1699            # If channel was closed, self._consumer_infos will be empty
1700            if consumer_tag in self._consumer_infos:
1701                del self._consumer_infos[consumer_tag]
1702                # Schedule termination of connection.process_data_events using a
1703                # negative channel number
1704                self.connection._request_channel_dispatch(-self.channel_number)
1705            raise
1706
1707        # NOTE: Consumer could get cancelled by broker immediately after opening
1708        # (e.g., queue getting deleted externally)
1709        if self._consumer_infos[consumer_tag].setting_up:
1710            self._consumer_infos[consumer_tag].state = _ConsumerInfo.ACTIVE
1711
1712        return consumer_tag
1713
1714    def basic_cancel(self, consumer_tag):
1715        """This method cancels a consumer. This does not affect already
1716        delivered messages, but it does mean the server will not send any more
1717        messages for that consumer. The client may receive an arbitrary number
1718        of messages in between sending the cancel method and receiving the
1719        cancel-ok reply.
1720
1721        NOTE: When cancelling an auto_ack=False consumer, this implementation
1722        automatically Nacks and suppresses any incoming messages that have not
1723        yet been dispatched to the consumer's callback. However, when cancelling
1724        a auto_ack=True consumer, this method will return any pending messages
1725        that arrived before broker confirmed the cancellation.
1726
1727        :param str consumer_tag: Identifier for the consumer; the result of
1728            passing a consumer_tag that was created on another channel is
1729            undefined (bad things will happen)
1730        :returns: (NEW IN pika 0.10.0) empty sequence for a auto_ack=False
1731            consumer; for a auto_ack=True consumer, returns a (possibly empty)
1732            sequence of pending messages that arrived before broker confirmed
1733            the cancellation (this is done instead of via consumer's callback in
1734            order to prevent reentrancy/recursion. Each message is four-tuple:
1735            (channel, method, properties, body)
1736                channel: BlockingChannel
1737                method: spec.Basic.Deliver
1738                properties: spec.BasicProperties
1739                body: bytes
1740        :rtype: list
1741        """
1742        try:
1743            consumer_info = self._consumer_infos[consumer_tag]
1744        except KeyError:
1745            LOGGER.warning(
1746                "User is attempting to cancel an unknown consumer=%s; "
1747                "already cancelled by user or broker?", consumer_tag)
1748            return []
1749
1750        try:
1751            # Assertion failure here is most likely due to reentrance
1752            assert consumer_info.active or consumer_info.cancelled_by_broker, (
1753                consumer_info.state)
1754
1755            # Assertion failure here signals disconnect between consumer state
1756            # in BlockingChannel and Channel
1757            assert (consumer_info.cancelled_by_broker or
1758                    consumer_tag in self._impl._consumers), consumer_tag
1759
1760            auto_ack = consumer_info.auto_ack
1761
1762            consumer_info.state = _ConsumerInfo.TEARING_DOWN
1763
1764            with _CallbackResult() as cancel_ok_result:
1765                # Nack pending messages for auto_ack=False consumer
1766                if not auto_ack:
1767                    pending_messages = self._remove_pending_deliveries(
1768                        consumer_tag)
1769                    if pending_messages:
1770                        # NOTE: we use impl's basic_reject to avoid the
1771                        # possibility of redelivery before basic_cancel takes
1772                        # control of nacking.
1773                        # NOTE: we can't use basic_nack with the multiple option
1774                        # to avoid nacking messages already held by our client.
1775                        for message in pending_messages:
1776                            self._impl.basic_reject(
1777                                message.method.delivery_tag, requeue=True)
1778
1779                # Cancel the consumer; impl takes care of rejecting any
1780                # additional deliveries that arrive for a auto_ack=False
1781                # consumer
1782                self._impl.basic_cancel(
1783                    consumer_tag=consumer_tag,
1784                    callback=cancel_ok_result.signal_once)
1785
1786                # Flush output and wait for Basic.Cancel-ok or
1787                # broker-initiated Basic.Cancel
1788                self._flush_output(
1789                    cancel_ok_result.is_ready,
1790                    lambda: consumer_tag not in self._impl._consumers)
1791
1792            if auto_ack:
1793                # Return pending messages for auto_ack=True consumer
1794                return [(evt.method, evt.properties, evt.body)
1795                        for evt in self._remove_pending_deliveries(consumer_tag)
1796                       ]
1797            else:
1798                # impl takes care of rejecting any incoming deliveries during
1799                # cancellation
1800                messages = self._remove_pending_deliveries(consumer_tag)
1801                assert not messages, messages
1802
1803                return []
1804        finally:
1805            # NOTE: The entry could be purged if channel or connection closes
1806            if consumer_tag in self._consumer_infos:
1807                del self._consumer_infos[consumer_tag]
1808                # Schedule termination of connection.process_data_events using a
1809                # negative channel number
1810                self.connection._request_channel_dispatch(-self.channel_number)
1811
1812    def _remove_pending_deliveries(self, consumer_tag):
1813        """Extract _ConsumerDeliveryEvt objects destined for the given consumer
1814        from pending events, discarding the _ConsumerCancellationEvt, if any
1815
1816        :param str consumer_tag:
1817
1818        :returns: a (possibly empty) sequence of _ConsumerDeliveryEvt destined
1819            for the given consumer tag
1820        :rtype: list
1821        """
1822        remaining_events = deque()
1823        unprocessed_messages = []
1824        while self._pending_events:
1825            evt = self._pending_events.popleft()
1826            if type(evt) is _ConsumerDeliveryEvt:  # pylint: disable=C0123
1827                if evt.method.consumer_tag == consumer_tag:
1828                    unprocessed_messages.append(evt)
1829                    continue
1830            if type(evt) is _ConsumerCancellationEvt:  # pylint: disable=C0123
1831                if evt.method_frame.method.consumer_tag == consumer_tag:
1832                    # A broker-initiated Basic.Cancel must have arrived
1833                    # before our cancel request completed
1834                    continue
1835
1836            remaining_events.append(evt)
1837
1838        self._pending_events = remaining_events
1839
1840        return unprocessed_messages
1841
1842    def start_consuming(self):
1843        """Processes I/O events and dispatches timers and `basic_consume`
1844        callbacks until all consumers are cancelled.
1845
1846        NOTE: this blocking function may not be called from the scope of a
1847        pika callback, because dispatching `basic_consume` callbacks from this
1848        context would constitute recursion.
1849
1850        :raises pika.exceptions.ReentrancyError: if called from the scope of a
1851            `BlockingConnection` or `BlockingChannel` callback
1852        :raises ChannelClosed: when this channel is closed by broker.
1853        """
1854        # Check if called from the scope of an event dispatch callback
1855        with self.connection._acquire_event_dispatch() as dispatch_allowed:
1856            if not dispatch_allowed:
1857                raise exceptions.ReentrancyError(
1858                    'start_consuming may not be called from the scope of '
1859                    'another BlockingConnection or BlockingChannel callback')
1860
1861        self._impl._raise_if_not_open()
1862
1863        # Process events as long as consumers exist on this channel
1864        while self._consumer_infos:
1865            # This will raise ChannelClosed if channel is closed by broker
1866            self._process_data_events(time_limit=None)
1867
1868    def stop_consuming(self, consumer_tag=None):
1869        """ Cancels all consumers, signalling the `start_consuming` loop to
1870        exit.
1871
1872        NOTE: pending non-ackable messages will be lost; pending ackable
1873        messages will be rejected.
1874
1875        """
1876        if consumer_tag:
1877            self.basic_cancel(consumer_tag)
1878        else:
1879            self._cancel_all_consumers()
1880
1881    def consume(self,
1882                queue,
1883                auto_ack=False,
1884                exclusive=False,
1885                arguments=None,
1886                inactivity_timeout=None):
1887        """Blocking consumption of a queue instead of via a callback. This
1888        method is a generator that yields each message as a tuple of method,
1889        properties, and body. The active generator iterator terminates when the
1890        consumer is cancelled by client via `BlockingChannel.cancel()` or by
1891        broker.
1892
1893        Example:
1894
1895            for method, properties, body in channel.consume('queue'):
1896                print body
1897                channel.basic_ack(method.delivery_tag)
1898
1899        You should call `BlockingChannel.cancel()` when you escape out of the
1900        generator loop.
1901
1902        If you don't cancel this consumer, then next call on the same channel
1903        to `consume()` with the exact same (queue, auto_ack, exclusive) parameters
1904        will resume the existing consumer generator; however, calling with
1905        different parameters will result in an exception.
1906
1907        :param str queue: The queue name to consume
1908        :param bool auto_ack: Tell the broker to not expect a ack/nack response
1909        :param bool exclusive: Don't allow other consumers on the queue
1910        :param dict arguments: Custom key/value pair arguments for the consumer
1911        :param float inactivity_timeout: if a number is given (in
1912            seconds), will cause the method to yield (None, None, None) after the
1913            given period of inactivity; this permits for pseudo-regular maintenance
1914            activities to be carried out by the user while waiting for messages
1915            to arrive. If None is given (default), then the method blocks until
1916            the next event arrives. NOTE that timing granularity is limited by
1917            the timer resolution of the underlying implementation.
1918            NEW in pika 0.10.0.
1919
1920        :yields: tuple(spec.Basic.Deliver, spec.BasicProperties, str or unicode)
1921
1922        :raises ValueError: if consumer-creation parameters don't match those
1923            of the existing queue consumer generator, if any.
1924            NEW in pika 0.10.0
1925        :raises ChannelClosed: when this channel is closed by broker.
1926
1927        """
1928        self._impl._raise_if_not_open()
1929
1930        params = (queue, auto_ack, exclusive)
1931
1932        if self._queue_consumer_generator is not None:
1933            if params != self._queue_consumer_generator.params:
1934                raise ValueError(
1935                    'Consume with different params not allowed on existing '
1936                    'queue consumer generator; previous params: %r; '
1937                    'new params: %r' % (self._queue_consumer_generator.params,
1938                                        (queue, auto_ack, exclusive)))
1939        else:
1940            LOGGER.debug('Creating new queue consumer generator; params: %r',
1941                         params)
1942            # Need a consumer tag to register consumer info before sending
1943            # request to broker, because I/O might pick up incoming messages
1944            # in addition to Basic.Consume-ok
1945            consumer_tag = self._impl._generate_consumer_tag()
1946
1947            self._queue_consumer_generator = _QueueConsumerGeneratorInfo(
1948                params, consumer_tag)
1949
1950            try:
1951                self._basic_consume_impl(
1952                    queue=queue,
1953                    auto_ack=auto_ack,
1954                    exclusive=exclusive,
1955                    consumer_tag=consumer_tag,
1956                    arguments=arguments,
1957                    alternate_event_sink=self._on_consumer_generator_event)
1958            except Exception:
1959                self._queue_consumer_generator = None
1960                raise
1961
1962            LOGGER.info('Created new queue consumer generator %r',
1963                        self._queue_consumer_generator)
1964
1965        while self._queue_consumer_generator is not None:
1966            # Process pending events
1967            if self._queue_consumer_generator.pending_events:
1968                evt = self._queue_consumer_generator.pending_events.popleft()
1969                if type(evt) is _ConsumerCancellationEvt:  # pylint: disable=C0123
1970                    # Consumer was cancelled by broker
1971                    self._queue_consumer_generator = None
1972                    break
1973                else:
1974                    yield (evt.method, evt.properties, evt.body)
1975                    continue
1976
1977            if inactivity_timeout is None:
1978                # Wait indefinitely for a message to arrive, while processing
1979                # I/O events and triggering ChannelClosed exception when the
1980                # channel fails
1981                self._process_data_events(time_limit=None)
1982                continue
1983
1984            # Wait with inactivity timeout
1985            wait_start_time = compat.time_now()
1986            wait_deadline = wait_start_time + inactivity_timeout
1987            delta = inactivity_timeout
1988
1989            while (self._queue_consumer_generator is not None and
1990                   not self._queue_consumer_generator.pending_events):
1991
1992                self._process_data_events(time_limit=delta)
1993
1994                if not self._queue_consumer_generator:
1995                    # Consumer was cancelled by client
1996                    break
1997
1998                if self._queue_consumer_generator.pending_events:
1999                    # Got message(s)
2000                    break
2001
2002                delta = wait_deadline - compat.time_now()
2003                if delta <= 0.0:
2004                    # Signal inactivity timeout
2005                    yield (None, None, None)
2006                    break
2007
2008    def _process_data_events(self, time_limit):
2009        """Wrapper for `BlockingConnection.process_data_events()` with common
2010        channel-specific logic that raises ChannelClosed if broker closed this
2011        channel.
2012
2013        NOTE: We need to raise an exception in the context of user's call into
2014        our API to protect the integrity of the underlying implementation.
2015        BlockingConnection's underlying asynchronous connection adapter
2016        (SelectConnection) uses callbacks to communicate with us. If
2017        BlockingConnection leaks exceptions back into the I/O loop or the
2018        asynchronous connection adapter, we interrupt their normal workflow and
2019        introduce a high likelihood of state inconsistency.
2020
2021        See `BlockingConnection.process_data_events()` for documentation of args
2022        and behavior.
2023
2024        :param float time_limit:
2025
2026        """
2027        self.connection.process_data_events(time_limit=time_limit)
2028        if self.is_closed and isinstance(self._closing_reason,
2029                                         exceptions.ChannelClosedByBroker):
2030            LOGGER.debug('Channel close by broker detected, raising %r; %r',
2031                         self._closing_reason, self)
2032            raise self._closing_reason  # pylint: disable=E0702
2033
2034    def get_waiting_message_count(self):
2035        """Returns the number of messages that may be retrieved from the current
2036        queue consumer generator via `BlockingChannel.consume` without blocking.
2037        NEW in pika 0.10.0
2038
2039        :returns: The number of waiting messages
2040        :rtype: int
2041        """
2042        if self._queue_consumer_generator is not None:
2043            pending_events = self._queue_consumer_generator.pending_events
2044            count = len(pending_events)
2045            if count and type(pending_events[-1]) is _ConsumerCancellationEvt:  # pylint: disable=C0123
2046                count -= 1
2047        else:
2048            count = 0
2049
2050        return count
2051
2052    def cancel(self):
2053        """Cancel the queue consumer created by `BlockingChannel.consume`,
2054        rejecting all pending ackable messages.
2055
2056        NOTE: If you're looking to cancel a consumer issued with
2057        BlockingChannel.basic_consume then you should call
2058        BlockingChannel.basic_cancel.
2059
2060        :returns: The number of messages requeued by Basic.Nack.
2061            NEW in 0.10.0: returns 0
2062        :rtype: int
2063
2064        """
2065        if self._queue_consumer_generator is None:
2066            LOGGER.warning('cancel: queue consumer generator is inactive '
2067                           '(already cancelled by client or broker?)')
2068            return 0
2069
2070        try:
2071            _, auto_ack, _ = self._queue_consumer_generator.params
2072            if not auto_ack:
2073                # Reject messages held by queue consumer generator; NOTE: we
2074                # can't use basic_nack with the multiple option to avoid nacking
2075                # messages already held by our client.
2076                pending_events = self._queue_consumer_generator.pending_events
2077                # NOTE `get_waiting_message_count` adjusts for `Basic.Cancel`
2078                #      from the server at the end (if any)
2079                for _ in compat.xrange(self.get_waiting_message_count()):
2080                    evt = pending_events.popleft()
2081                    self._impl.basic_reject(
2082                        evt.method.delivery_tag, requeue=True)
2083
2084            self.basic_cancel(self._queue_consumer_generator.consumer_tag)
2085        finally:
2086            self._queue_consumer_generator = None
2087
2088        # Return 0 for compatibility with legacy implementation; the number of
2089        # nacked messages is not meaningful since only messages consumed with
2090        # auto_ack=False may be nacked, and those arriving after calling
2091        # basic_cancel will be rejected automatically by impl channel, so we'll
2092        # never know how many of those were nacked.
2093        return 0
2094
2095    def basic_ack(self, delivery_tag=0, multiple=False):
2096        """Acknowledge one or more messages. When sent by the client, this
2097        method acknowledges one or more messages delivered via the Deliver or
2098        Get-Ok methods. When sent by server, this method acknowledges one or
2099        more messages published with the Publish method on a channel in
2100        confirm mode. The acknowledgement can be for a single message or a
2101        set of messages up to and including a specific message.
2102
2103        :param int delivery-tag: The server-assigned delivery tag
2104        :param bool multiple: If set to True, the delivery tag is treated as
2105                              "up to and including", so that multiple messages
2106                              can be acknowledged with a single method. If set
2107                              to False, the delivery tag refers to a single
2108                              message. If the multiple field is 1, and the
2109                              delivery tag is zero, this indicates
2110                              acknowledgement of all outstanding messages.
2111        """
2112        self._impl.basic_ack(delivery_tag=delivery_tag, multiple=multiple)
2113        self._flush_output()
2114
2115    def basic_nack(self, delivery_tag=None, multiple=False, requeue=True):
2116        """This method allows a client to reject one or more incoming messages.
2117        It can be used to interrupt and cancel large incoming messages, or
2118        return untreatable messages to their original queue.
2119
2120        :param int delivery-tag: The server-assigned delivery tag
2121        :param bool multiple: If set to True, the delivery tag is treated as
2122                              "up to and including", so that multiple messages
2123                              can be acknowledged with a single method. If set
2124                              to False, the delivery tag refers to a single
2125                              message. If the multiple field is 1, and the
2126                              delivery tag is zero, this indicates
2127                              acknowledgement of all outstanding messages.
2128        :param bool requeue: If requeue is true, the server will attempt to
2129                             requeue the message. If requeue is false or the
2130                             requeue attempt fails the messages are discarded or
2131                             dead-lettered.
2132
2133        """
2134        self._impl.basic_nack(
2135            delivery_tag=delivery_tag, multiple=multiple, requeue=requeue)
2136        self._flush_output()
2137
2138    def basic_get(self, queue, auto_ack=False):
2139        """Get a single message from the AMQP broker. Returns a sequence with
2140        the method frame, message properties, and body.
2141
2142        :param str queue: Name of queue from which to get a message
2143        :param bool auto_ack: Tell the broker to not expect a reply
2144        :returns: a three-tuple; (None, None, None) if the queue was empty;
2145            otherwise (method, properties, body); NOTE: body may be None
2146        :rtype: (spec.Basic.GetOk|None, spec.BasicProperties|None, str|None)
2147        """
2148        assert not self._basic_getempty_result
2149
2150        validators.require_string(queue, 'queue')
2151
2152        # NOTE: nested with for python 2.6 compatibility
2153        with _CallbackResult(self._RxMessageArgs) as get_ok_result:
2154            with self._basic_getempty_result:
2155                self._impl.basic_get(
2156                    queue=queue,
2157                    auto_ack=auto_ack,
2158                    callback=get_ok_result.set_value_once)
2159                self._flush_output(get_ok_result.is_ready,
2160                                   self._basic_getempty_result.is_ready)
2161                if get_ok_result:
2162                    evt = get_ok_result.value
2163                    return evt.method, evt.properties, evt.body
2164                else:
2165                    assert self._basic_getempty_result, (
2166                        "wait completed without GetOk and GetEmpty")
2167                    return None, None, None
2168
2169    def basic_publish(self,
2170                      exchange,
2171                      routing_key,
2172                      body,
2173                      properties=None,
2174                      mandatory=False):
2175        """Publish to the channel with the given exchange, routing key, and
2176        body.
2177
2178        For more information on basic_publish and what the parameters do, see:
2179
2180            http://www.rabbitmq.com/amqp-0-9-1-reference.html#basic.publish
2181
2182        NOTE: mandatory may be enabled even without delivery
2183          confirmation, but in the absence of delivery confirmation the
2184          synchronous implementation has no way to know how long to wait for
2185          the Basic.Return.
2186
2187        :param str exchange: The exchange to publish to
2188        :param str routing_key: The routing key to bind on
2189        :param bytes body: The message body; empty string if no body
2190        :param pika.spec.BasicProperties properties: message properties
2191        :param bool mandatory: The mandatory flag
2192
2193        :raises UnroutableError: raised when a message published in
2194            publisher-acknowledgments mode (see
2195            `BlockingChannel.confirm_delivery`) is returned via `Basic.Return`
2196            followed by `Basic.Ack`.
2197        :raises NackError: raised when a message published in
2198            publisher-acknowledgements mode is Nack'ed by the broker. See
2199            `BlockingChannel.confirm_delivery`.
2200
2201        """
2202        if self._delivery_confirmation:
2203            # In publisher-acknowledgments mode
2204            with self._message_confirmation_result:
2205                self._impl.basic_publish(
2206                    exchange=exchange,
2207                    routing_key=routing_key,
2208                    body=body,
2209                    properties=properties,
2210                    mandatory=mandatory)
2211
2212                self._flush_output(self._message_confirmation_result.is_ready)
2213                conf_method = (
2214                    self._message_confirmation_result.value.method_frame.method)
2215
2216                if isinstance(conf_method, pika.spec.Basic.Nack):
2217                    # Broker was unable to process the message due to internal
2218                    # error
2219                    LOGGER.warning(
2220                        "Message was Nack'ed by broker: nack=%r; channel=%s; "
2221                        "exchange=%s; routing_key=%s; mandatory=%r; ",
2222                        conf_method, self.channel_number, exchange, routing_key,
2223                        mandatory)
2224                    if self._puback_return is not None:
2225                        returned_messages = [self._puback_return]
2226                        self._puback_return = None
2227                    else:
2228                        returned_messages = []
2229                    raise exceptions.NackError(returned_messages)
2230
2231                else:
2232                    assert isinstance(conf_method,
2233                                      pika.spec.Basic.Ack), (conf_method)
2234
2235                    if self._puback_return is not None:
2236                        # Unroutable message was returned
2237                        messages = [self._puback_return]
2238                        self._puback_return = None
2239                        raise exceptions.UnroutableError(messages)
2240        else:
2241            # In non-publisher-acknowledgments mode
2242            self._impl.basic_publish(
2243                exchange=exchange,
2244                routing_key=routing_key,
2245                body=body,
2246                properties=properties,
2247                mandatory=mandatory)
2248            self._flush_output()
2249
2250    def basic_qos(self, prefetch_size=0, prefetch_count=0, global_qos=False):
2251        """Specify quality of service. This method requests a specific quality
2252        of service. The QoS can be specified for the current channel or for all
2253        channels on the connection. The client can request that messages be sent
2254        in advance so that when the client finishes processing a message, the
2255        following message is already held locally, rather than needing to be
2256        sent down the channel. Prefetching gives a performance improvement.
2257
2258        :param int prefetch_size:  This field specifies the prefetch window
2259                                   size. The server will send a message in
2260                                   advance if it is equal to or smaller in size
2261                                   than the available prefetch size (and also
2262                                   falls into other prefetch limits). May be set
2263                                   to zero, meaning "no specific limit",
2264                                   although other prefetch limits may still
2265                                   apply. The prefetch-size is ignored if the
2266                                   no-ack option is set in the consumer.
2267        :param int prefetch_count: Specifies a prefetch window in terms of whole
2268                                   messages. This field may be used in
2269                                   combination with the prefetch-size field; a
2270                                   message will only be sent in advance if both
2271                                   prefetch windows (and those at the channel
2272                                   and connection level) allow it. The
2273                                   prefetch-count is ignored if the no-ack
2274                                   option is set in the consumer.
2275        :param bool global_qos:    Should the QoS apply to all channels on the
2276                                   connection.
2277
2278        """
2279        with _CallbackResult() as qos_ok_result:
2280            self._impl.basic_qos(
2281                callback=qos_ok_result.signal_once,
2282                prefetch_size=prefetch_size,
2283                prefetch_count=prefetch_count,
2284                global_qos=global_qos)
2285            self._flush_output(qos_ok_result.is_ready)
2286
2287    def basic_recover(self, requeue=False):
2288        """This method asks the server to redeliver all unacknowledged messages
2289        on a specified channel. Zero or more messages may be redelivered. This
2290        method replaces the asynchronous Recover.
2291
2292        :param bool requeue: If False, the message will be redelivered to the
2293                             original recipient. If True, the server will
2294                             attempt to requeue the message, potentially then
2295                             delivering it to an alternative subscriber.
2296
2297        """
2298        with _CallbackResult() as recover_ok_result:
2299            self._impl.basic_recover(
2300                requeue=requeue, callback=recover_ok_result.signal_once)
2301            self._flush_output(recover_ok_result.is_ready)
2302
2303    def basic_reject(self, delivery_tag=None, requeue=True):
2304        """Reject an incoming message. This method allows a client to reject a
2305        message. It can be used to interrupt and cancel large incoming messages,
2306        or return untreatable messages to their original queue.
2307
2308        :param int delivery-tag: The server-assigned delivery tag
2309        :param bool requeue: If requeue is true, the server will attempt to
2310                             requeue the message. If requeue is false or the
2311                             requeue attempt fails the messages are discarded or
2312                             dead-lettered.
2313
2314        """
2315        self._impl.basic_reject(delivery_tag=delivery_tag, requeue=requeue)
2316        self._flush_output()
2317
2318    def confirm_delivery(self):
2319        """Turn on RabbitMQ-proprietary Confirm mode in the channel.
2320
2321        For more information see:
2322            https://www.rabbitmq.com/confirms.html
2323        """
2324        if self._delivery_confirmation:
2325            LOGGER.error(
2326                'confirm_delivery: confirmation was already enabled '
2327                'on channel=%s', self.channel_number)
2328            return
2329
2330        with _CallbackResult() as select_ok_result:
2331            self._impl.confirm_delivery(
2332                ack_nack_callback=self._message_confirmation_result.
2333                set_value_once,
2334                callback=select_ok_result.signal_once)
2335
2336            self._flush_output(select_ok_result.is_ready)
2337
2338        self._delivery_confirmation = True
2339
2340        # Unroutable messages returned after this point will be in the context
2341        # of publisher acknowledgments
2342        self._impl.add_on_return_callback(self._on_puback_message_returned)
2343
2344    def exchange_declare(self,
2345                         exchange,
2346                         exchange_type='direct',
2347                         passive=False,
2348                         durable=False,
2349                         auto_delete=False,
2350                         internal=False,
2351                         arguments=None):
2352        """This method creates an exchange if it does not already exist, and if
2353        the exchange exists, verifies that it is of the correct and expected
2354        class.
2355
2356        If passive set, the server will reply with Declare-Ok if the exchange
2357        already exists with the same name, and raise an error if not and if the
2358        exchange does not already exist, the server MUST raise a channel
2359        exception with reply code 404 (not found).
2360
2361        :param str exchange: The exchange name consists of a non-empty sequence of
2362                          these characters: letters, digits, hyphen, underscore,
2363                          period, or colon.
2364        :param str exchange_type: The exchange type to use
2365        :param bool passive: Perform a declare or just check to see if it exists
2366        :param bool durable: Survive a reboot of RabbitMQ
2367        :param bool auto_delete: Remove when no more queues are bound to it
2368        :param bool internal: Can only be published to by other exchanges
2369        :param dict arguments: Custom key/value pair arguments for the exchange
2370        :returns: Method frame from the Exchange.Declare-ok response
2371        :rtype: `pika.frame.Method` having `method` attribute of type
2372            `spec.Exchange.DeclareOk`
2373
2374        """
2375        validators.require_string(exchange, 'exchange')
2376        with _CallbackResult(
2377                self._MethodFrameCallbackResultArgs) as declare_ok_result:
2378            self._impl.exchange_declare(
2379                exchange=exchange,
2380                exchange_type=exchange_type,
2381                passive=passive,
2382                durable=durable,
2383                auto_delete=auto_delete,
2384                internal=internal,
2385                arguments=arguments,
2386                callback=declare_ok_result.set_value_once)
2387
2388            self._flush_output(declare_ok_result.is_ready)
2389            return declare_ok_result.value.method_frame
2390
2391    def exchange_delete(self, exchange=None, if_unused=False):
2392        """Delete the exchange.
2393
2394        :param str exchange: The exchange name
2395        :param bool if_unused: only delete if the exchange is unused
2396        :returns: Method frame from the Exchange.Delete-ok response
2397        :rtype: `pika.frame.Method` having `method` attribute of type
2398            `spec.Exchange.DeleteOk`
2399
2400        """
2401        with _CallbackResult(
2402                self._MethodFrameCallbackResultArgs) as delete_ok_result:
2403            self._impl.exchange_delete(
2404                exchange=exchange,
2405                if_unused=if_unused,
2406                callback=delete_ok_result.set_value_once)
2407
2408            self._flush_output(delete_ok_result.is_ready)
2409            return delete_ok_result.value.method_frame
2410
2411    def exchange_bind(self, destination, source, routing_key='',
2412                      arguments=None):
2413        """Bind an exchange to another exchange.
2414
2415        :param str destination: The destination exchange to bind
2416        :param str source: The source exchange to bind to
2417        :param str routing_key: The routing key to bind on
2418        :param dict arguments: Custom key/value pair arguments for the binding
2419        :returns: Method frame from the Exchange.Bind-ok response
2420        :rtype: `pika.frame.Method` having `method` attribute of type
2421          `spec.Exchange.BindOk`
2422
2423        """
2424        validators.require_string(destination, 'destination')
2425        validators.require_string(source, 'source')
2426        with _CallbackResult(self._MethodFrameCallbackResultArgs) as \
2427                bind_ok_result:
2428            self._impl.exchange_bind(
2429                destination=destination,
2430                source=source,
2431                routing_key=routing_key,
2432                arguments=arguments,
2433                callback=bind_ok_result.set_value_once)
2434
2435            self._flush_output(bind_ok_result.is_ready)
2436            return bind_ok_result.value.method_frame
2437
2438    def exchange_unbind(self,
2439                        destination=None,
2440                        source=None,
2441                        routing_key='',
2442                        arguments=None):
2443        """Unbind an exchange from another exchange.
2444
2445        :param str destination: The destination exchange to unbind
2446        :param str source: The source exchange to unbind from
2447        :param str routing_key: The routing key to unbind
2448        :param dict arguments: Custom key/value pair arguments for the binding
2449        :returns: Method frame from the Exchange.Unbind-ok response
2450        :rtype: `pika.frame.Method` having `method` attribute of type
2451            `spec.Exchange.UnbindOk`
2452
2453        """
2454        with _CallbackResult(
2455                self._MethodFrameCallbackResultArgs) as unbind_ok_result:
2456            self._impl.exchange_unbind(
2457                destination=destination,
2458                source=source,
2459                routing_key=routing_key,
2460                arguments=arguments,
2461                callback=unbind_ok_result.set_value_once)
2462
2463            self._flush_output(unbind_ok_result.is_ready)
2464            return unbind_ok_result.value.method_frame
2465
2466    def queue_declare(self,
2467                      queue,
2468                      passive=False,
2469                      durable=False,
2470                      exclusive=False,
2471                      auto_delete=False,
2472                      arguments=None):
2473        """Declare queue, create if needed. This method creates or checks a
2474        queue. When creating a new queue the client can specify various
2475        properties that control the durability of the queue and its contents,
2476        and the level of sharing for the queue.
2477
2478        Use an empty string as the queue name for the broker to auto-generate
2479        one. Retrieve this auto-generated queue name from the returned
2480        `spec.Queue.DeclareOk` method frame.
2481
2482        :param str queue: The queue name; if empty string, the broker will
2483            create a unique queue name
2484        :param bool passive: Only check to see if the queue exists and raise
2485          `ChannelClosed` if it doesn't
2486        :param bool durable: Survive reboots of the broker
2487        :param bool exclusive: Only allow access by the current connection
2488        :param bool auto_delete: Delete after consumer cancels or disconnects
2489        :param dict arguments: Custom key/value arguments for the queue
2490        :returns: Method frame from the Queue.Declare-ok response
2491        :rtype: `pika.frame.Method` having `method` attribute of type
2492            `spec.Queue.DeclareOk`
2493
2494        """
2495        validators.require_string(queue, 'queue')
2496        with _CallbackResult(self._MethodFrameCallbackResultArgs) as \
2497                declare_ok_result:
2498            self._impl.queue_declare(
2499                queue=queue,
2500                passive=passive,
2501                durable=durable,
2502                exclusive=exclusive,
2503                auto_delete=auto_delete,
2504                arguments=arguments,
2505                callback=declare_ok_result.set_value_once)
2506
2507            self._flush_output(declare_ok_result.is_ready)
2508            return declare_ok_result.value.method_frame
2509
2510    def queue_delete(self, queue, if_unused=False, if_empty=False):
2511        """Delete a queue from the broker.
2512
2513        :param str queue: The queue to delete
2514        :param bool if_unused: only delete if it's unused
2515        :param bool if_empty: only delete if the queue is empty
2516        :returns: Method frame from the Queue.Delete-ok response
2517        :rtype: `pika.frame.Method` having `method` attribute of type
2518            `spec.Queue.DeleteOk`
2519
2520        """
2521        with _CallbackResult(self._MethodFrameCallbackResultArgs) as \
2522                delete_ok_result:
2523            self._impl.queue_delete(
2524                queue=queue,
2525                if_unused=if_unused,
2526                if_empty=if_empty,
2527                callback=delete_ok_result.set_value_once)
2528
2529            self._flush_output(delete_ok_result.is_ready)
2530            return delete_ok_result.value.method_frame
2531
2532    def queue_purge(self, queue):
2533        """Purge all of the messages from the specified queue
2534
2535        :param str queue: The queue to purge
2536        :returns: Method frame from the Queue.Purge-ok response
2537        :rtype: `pika.frame.Method` having `method` attribute of type
2538            `spec.Queue.PurgeOk`
2539
2540        """
2541        with _CallbackResult(self._MethodFrameCallbackResultArgs) as \
2542                purge_ok_result:
2543            self._impl.queue_purge(
2544                queue=queue, callback=purge_ok_result.set_value_once)
2545            self._flush_output(purge_ok_result.is_ready)
2546            return purge_ok_result.value.method_frame
2547
2548    def queue_bind(self, queue, exchange, routing_key=None, arguments=None):
2549        """Bind the queue to the specified exchange
2550
2551        :param str queue: The queue to bind to the exchange
2552        :param str exchange: The source exchange to bind to
2553        :param str routing_key: The routing key to bind on
2554        :param dict arguments: Custom key/value pair arguments for the binding
2555
2556        :returns: Method frame from the Queue.Bind-ok response
2557        :rtype: `pika.frame.Method` having `method` attribute of type
2558            `spec.Queue.BindOk`
2559
2560        """
2561        validators.require_string(queue, 'queue')
2562        validators.require_string(exchange, 'exchange')
2563        with _CallbackResult(
2564                self._MethodFrameCallbackResultArgs) as bind_ok_result:
2565            self._impl.queue_bind(
2566                queue=queue,
2567                exchange=exchange,
2568                routing_key=routing_key,
2569                arguments=arguments,
2570                callback=bind_ok_result.set_value_once)
2571            self._flush_output(bind_ok_result.is_ready)
2572            return bind_ok_result.value.method_frame
2573
2574    def queue_unbind(self,
2575                     queue,
2576                     exchange=None,
2577                     routing_key=None,
2578                     arguments=None):
2579        """Unbind a queue from an exchange.
2580
2581        :param str queue: The queue to unbind from the exchange
2582        :param str exchange: The source exchange to bind from
2583        :param str routing_key: The routing key to unbind
2584        :param dict arguments: Custom key/value pair arguments for the binding
2585
2586        :returns: Method frame from the Queue.Unbind-ok response
2587        :rtype: `pika.frame.Method` having `method` attribute of type
2588            `spec.Queue.UnbindOk`
2589
2590        """
2591        with _CallbackResult(self._MethodFrameCallbackResultArgs) as \
2592                unbind_ok_result:
2593            self._impl.queue_unbind(
2594                queue=queue,
2595                exchange=exchange,
2596                routing_key=routing_key,
2597                arguments=arguments,
2598                callback=unbind_ok_result.set_value_once)
2599            self._flush_output(unbind_ok_result.is_ready)
2600            return unbind_ok_result.value.method_frame
2601
2602    def tx_select(self):
2603        """Select standard transaction mode. This method sets the channel to use
2604        standard transactions. The client must use this method at least once on
2605        a channel before using the Commit or Rollback methods.
2606
2607        :returns: Method frame from the Tx.Select-ok response
2608        :rtype: `pika.frame.Method` having `method` attribute of type
2609            `spec.Tx.SelectOk`
2610
2611        """
2612        with _CallbackResult(self._MethodFrameCallbackResultArgs) as \
2613                select_ok_result:
2614            self._impl.tx_select(select_ok_result.set_value_once)
2615
2616            self._flush_output(select_ok_result.is_ready)
2617            return select_ok_result.value.method_frame
2618
2619    def tx_commit(self):
2620        """Commit a transaction.
2621
2622        :returns: Method frame from the Tx.Commit-ok response
2623        :rtype: `pika.frame.Method` having `method` attribute of type
2624            `spec.Tx.CommitOk`
2625
2626        """
2627        with _CallbackResult(self._MethodFrameCallbackResultArgs) as \
2628                commit_ok_result:
2629            self._impl.tx_commit(commit_ok_result.set_value_once)
2630
2631            self._flush_output(commit_ok_result.is_ready)
2632            return commit_ok_result.value.method_frame
2633
2634    def tx_rollback(self):
2635        """Rollback a transaction.
2636
2637        :returns: Method frame from the Tx.Commit-ok response
2638        :rtype: `pika.frame.Method` having `method` attribute of type
2639            `spec.Tx.CommitOk`
2640
2641        """
2642        with _CallbackResult(self._MethodFrameCallbackResultArgs) as \
2643                rollback_ok_result:
2644            self._impl.tx_rollback(rollback_ok_result.set_value_once)
2645
2646            self._flush_output(rollback_ok_result.is_ready)
2647            return rollback_ok_result.value.method_frame
2648