1"""The Channel class provides a wrapper for interacting with RabbitMQ
2implementing the methods and behaviors for an AMQP Channel.
3
4"""
5# disable too-many-lines
6# pylint: disable=C0302
7
8import collections
9import logging
10import uuid
11
12import pika.frame as frame
13import pika.exceptions as exceptions
14import pika.spec as spec
15import pika.validators as validators
16from pika.compat import unicode_type, dictkeys, is_integer
17
18LOGGER = logging.getLogger(__name__)
19
20MAX_CHANNELS = 65535  # per AMQP 0.9.1 spec.
21
22
23class Channel(object):
24    """A Channel is the primary communication method for interacting with
25    RabbitMQ. It is recommended that you do not directly invoke the creation of
26    a channel object in your application code but rather construct a channel by
27    calling the active connection's channel() method.
28
29    """
30
31    # Disable pylint messages concerning "method could be a function"
32    # pylint: disable=R0201
33
34    CLOSED = 0
35    OPENING = 1
36    OPEN = 2
37    CLOSING = 3  # client-initiated close in progress
38
39    _STATE_NAMES = {
40        CLOSED: 'CLOSED',
41        OPENING: 'OPENING',
42        OPEN: 'OPEN',
43        CLOSING: 'CLOSING'
44    }
45
46    _ON_CHANNEL_CLEANUP_CB_KEY = '_on_channel_cleanup'
47
48    def __init__(self, connection, channel_number, on_open_callback):
49        """Create a new instance of the Channel
50
51        :param pika.connection.Connection connection: The connection
52        :param int channel_number: The channel number for this instance
53        :param callable on_open_callback: The callback to call on channel open.
54            The callback will be invoked with the `Channel` instance as its only
55            argument.
56
57        """
58        if not isinstance(channel_number, int):
59            raise exceptions.InvalidChannelNumber(channel_number)
60
61        validators.rpc_completion_callback(on_open_callback)
62
63        self.channel_number = channel_number
64        self.callbacks = connection.callbacks
65        self.connection = connection
66
67        # Initially, flow is assumed to be active
68        self.flow_active = True
69
70        self._content_assembler = ContentFrameAssembler()
71
72        self._blocked = collections.deque(list())
73        self._blocking = None
74        self._has_on_flow_callback = False
75        self._cancelled = set()
76        self._consumers = dict()
77        self._consumers_with_noack = set()
78        self._on_flowok_callback = None
79        self._on_getok_callback = None
80        self._on_openok_callback = on_open_callback
81        self._state = self.CLOSED
82
83        # We save the closing reason exception to be passed to on-channel-close
84        # callback at closing of the channel. Exception representing the closing
85        # reason; ChannelClosedByClient or ChannelClosedByBroker on controlled
86        # close; otherwise another exception describing the reason for failure
87        # (most likely connection failure).
88        self._closing_reason = None  # type: None | Exception
89
90        # opaque cookie value set by wrapper layer (e.g., BlockingConnection)
91        # via _set_cookie
92        self._cookie = None
93
94    def __int__(self):
95        """Return the channel object as its channel number
96
97        :rtype: int
98
99        """
100        return self.channel_number
101
102    def __repr__(self):
103        return '<%s number=%s %s conn=%r>' % (
104            self.__class__.__name__, self.channel_number,
105            self._STATE_NAMES[self._state], self.connection)
106
107    def add_callback(self, callback, replies, one_shot=True):
108        """Pass in a callback handler and a list replies from the
109        RabbitMQ broker which you'd like the callback notified of. Callbacks
110        should allow for the frame parameter to be passed in.
111
112        :param callable callback: The callback to call
113        :param list replies: The replies to get a callback for
114        :param bool one_shot: Only handle the first type callback
115
116        """
117        for reply in replies:
118            self.callbacks.add(self.channel_number, reply, callback, one_shot)
119
120    def add_on_cancel_callback(self, callback):
121        """Pass a callback function that will be called when the basic_cancel
122        is sent by the server. The callback function should receive a frame
123        parameter.
124
125        :param callable callback: The callback to call on Basic.Cancel from
126            broker
127
128        """
129        self.callbacks.add(self.channel_number, spec.Basic.Cancel, callback,
130                           False)
131
132    def add_on_close_callback(self, callback):
133        """Pass a callback function that will be called when the channel is
134        closed. The callback function will receive the channel and an exception
135        describing why the channel was closed.
136
137        If the channel is closed by broker via Channel.Close, the callback will
138        receive `ChannelClosedByBroker` as the reason.
139
140        If graceful user-initiated channel closing completes successfully (
141        either directly of indirectly by closing a connection containing the
142        channel) and closing concludes gracefully without Channel.Close from the
143        broker and without loss of connection, the callback will receive
144        `ChannelClosedByClient` exception as reason.
145
146        If channel was closed due to loss of connection, the callback will
147        receive another exception type describing the failure.
148
149        :param callable callback: The callback, having the signature:
150            callback(Channel, Exception reason)
151
152        """
153        self.callbacks.add(self.channel_number, '_on_channel_close', callback,
154                           False, self)
155
156    def add_on_flow_callback(self, callback):
157        """Pass a callback function that will be called when Channel.Flow is
158        called by the remote server. Note that newer versions of RabbitMQ
159        will not issue this but instead use TCP backpressure
160
161        :param callable callback: The callback function
162
163        """
164        self._has_on_flow_callback = True
165        self.callbacks.add(self.channel_number, spec.Channel.Flow, callback,
166                           False)
167
168    def add_on_return_callback(self, callback):
169        """Pass a callback function that will be called when basic_publish is
170        sent a message that has been rejected and returned by the server.
171
172        :param callable callback: The function to call, having the signature
173                                callback(channel, method, properties, body)
174                                where
175                                channel: pika.Channel
176                                method: pika.spec.Basic.Return
177                                properties: pika.spec.BasicProperties
178                                body: bytes
179
180        """
181        self.callbacks.add(self.channel_number, '_on_return', callback, False)
182
183    def basic_ack(self, delivery_tag=0, multiple=False):
184        """Acknowledge one or more messages. When sent by the client, this
185        method acknowledges one or more messages delivered via the Deliver or
186        Get-Ok methods. When sent by server, this method acknowledges one or
187        more messages published with the Publish method on a channel in
188        confirm mode. The acknowledgement can be for a single message or a
189        set of messages up to and including a specific message.
190
191        :param integer delivery_tag: int/long The server-assigned delivery tag
192        :param bool multiple: If set to True, the delivery tag is treated as
193                              "up to and including", so that multiple messages
194                              can be acknowledged with a single method. If set
195                              to False, the delivery tag refers to a single
196                              message. If the multiple field is 1, and the
197                              delivery tag is zero, this indicates
198                              acknowledgement of all outstanding messages.
199
200        """
201        self._raise_if_not_open()
202        return self._send_method(spec.Basic.Ack(delivery_tag, multiple))
203
204    def basic_cancel(self, consumer_tag='', callback=None):
205        """This method cancels a consumer. This does not affect already
206        delivered messages, but it does mean the server will not send any more
207        messages for that consumer. The client may receive an arbitrary number
208        of messages in between sending the cancel method and receiving the
209        cancel-ok reply. It may also be sent from the server to the client in
210        the event of the consumer being unexpectedly cancelled (i.e. cancelled
211        for any reason other than the server receiving the corresponding
212        basic.cancel from the client). This allows clients to be notified of
213        the loss of consumers due to events such as queue deletion.
214
215        :param str consumer_tag: Identifier for the consumer
216        :param callable callback: callback(pika.frame.Method) for method
217            Basic.CancelOk. If None, do not expect a Basic.CancelOk response,
218            otherwise, callback must be callable
219
220        :raises ValueError:
221
222        """
223        validators.require_string(consumer_tag, 'consumer_tag')
224        self._raise_if_not_open()
225        nowait = validators.rpc_completion_callback(callback)
226
227        if consumer_tag in self._cancelled:
228            # We check for cancelled first, because basic_cancel removes
229            # consumers closed with nowait from self._consumers
230            LOGGER.warning('basic_cancel - consumer is already cancelling: %s',
231                           consumer_tag)
232            return
233
234        if consumer_tag not in self._consumers:
235            # Could be cancelled by user or broker earlier
236            LOGGER.warning('basic_cancel - consumer not found: %s',
237                           consumer_tag)
238            return
239
240        LOGGER.debug('Cancelling consumer: %s (nowait=%s)', consumer_tag,
241                     nowait)
242
243        if nowait:
244            # This is our last opportunity while the channel is open to remove
245            # this consumer callback and help gc; unfortunately, this consumer's
246            # self._cancelled and self._consumers_with_noack (if any) entries
247            # will persist until the channel is closed.
248            del self._consumers[consumer_tag]
249
250        if callback is not None:
251            self.callbacks.add(self.channel_number, spec.Basic.CancelOk,
252                               callback)
253
254        self._cancelled.add(consumer_tag)
255
256        self._rpc(spec.Basic.Cancel(consumer_tag=consumer_tag, nowait=nowait),
257                  self._on_cancelok if not nowait else None,
258                  [(spec.Basic.CancelOk, {
259                      'consumer_tag': consumer_tag
260                  })] if not nowait else [])
261
262    def basic_consume(self,
263                      queue,
264                      on_message_callback,
265                      auto_ack=False,
266                      exclusive=False,
267                      consumer_tag=None,
268                      arguments=None,
269                      callback=None):
270        """Sends the AMQP 0-9-1 command Basic.Consume to the broker and binds messages
271        for the consumer_tag to the consumer callback. If you do not pass in
272        a consumer_tag, one will be automatically generated for you. Returns
273        the consumer tag.
274
275        For more information on basic_consume, see:
276        Tutorial 2 at http://www.rabbitmq.com/getstarted.html
277        http://www.rabbitmq.com/confirms.html
278        http://www.rabbitmq.com/amqp-0-9-1-reference.html#basic.consume
279
280        :param str queue: The queue to consume from. Use the empty string to
281            specify the most recent server-named queue for this channel
282        :param callable on_message_callback: The function to call when
283            consuming with the signature
284            on_message_callback(channel, method, properties, body), where
285                channel: pika.Channel
286                method: pika.spec.Basic.Deliver
287                properties: pika.spec.BasicProperties
288                body: bytes
289        :param bool auto_ack: if set to True, automatic acknowledgement mode
290            will be used (see http://www.rabbitmq.com/confirms.html).
291            This corresponds with the 'no_ack' parameter in the basic.consume
292            AMQP 0.9.1 method
293        :param bool exclusive: Don't allow other consumers on the queue
294        :param str consumer_tag: Specify your own consumer tag
295        :param dict arguments: Custom key/value pair arguments for the consumer
296        :param callable callback: callback(pika.frame.Method) for method
297          Basic.ConsumeOk.
298        :returns: Consumer tag which may be used to cancel the consumer.
299        :rtype: str
300        :raises ValueError:
301
302        """
303        validators.require_string(queue, 'queue')
304        validators.require_callback(on_message_callback)
305        self._raise_if_not_open()
306        validators.rpc_completion_callback(callback)
307
308        # If a consumer tag was not passed, create one
309        if not consumer_tag:
310            consumer_tag = self._generate_consumer_tag()
311
312        if consumer_tag in self._consumers or consumer_tag in self._cancelled:
313            raise exceptions.DuplicateConsumerTag(consumer_tag)
314
315        if auto_ack:
316            self._consumers_with_noack.add(consumer_tag)
317
318        self._consumers[consumer_tag] = on_message_callback
319
320        rpc_callback = self._on_eventok if callback is None else callback
321
322        self._rpc(
323            spec.Basic.Consume(queue=queue,
324                               consumer_tag=consumer_tag,
325                               no_ack=auto_ack,
326                               exclusive=exclusive,
327                               arguments=arguments or dict()), rpc_callback,
328            [(spec.Basic.ConsumeOk, {
329                'consumer_tag': consumer_tag
330            })])
331
332        return consumer_tag
333
334    def _generate_consumer_tag(self):
335        """Generate a consumer tag
336
337        NOTE: this protected method may be called by derived classes
338
339        :returns: consumer tag
340        :rtype: str
341
342        """
343        return 'ctag%i.%s' % (self.channel_number, uuid.uuid4().hex)
344
345    def basic_get(self, queue, callback, auto_ack=False):
346        """Get a single message from the AMQP broker. If you want to
347        be notified of Basic.GetEmpty, use the Channel.add_callback method
348        adding your Basic.GetEmpty callback which should expect only one
349        parameter, frame. Due to implementation details, this cannot be called
350        a second time until the callback is executed.  For more information on
351        basic_get and its parameters, see:
352
353        http://www.rabbitmq.com/amqp-0-9-1-reference.html#basic.get
354
355        :param str queue: The queue from which to get a message. Use the empty
356            string to specify the most recent server-named queue for this
357            channel
358        :param callable callback: The callback to call with a message that has
359            the signature callback(channel, method, properties, body), where:
360            channel: pika.Channel
361            method: pika.spec.Basic.GetOk
362            properties: pika.spec.BasicProperties
363            body: bytes
364        :param bool auto_ack: Tell the broker to not expect a reply
365        :raises ValueError:
366
367        """
368        validators.require_string(queue, 'queue')
369        validators.require_callback(callback)
370        if self._on_getok_callback is not None:
371            raise exceptions.DuplicateGetOkCallback()
372        self._on_getok_callback = callback
373
374        # pylint: disable=W0511
375        # TODO Strangely, not using _rpc for the synchronous Basic.Get. Would
376        # need to extend _rpc to handle Basic.GetOk method, header, and body
377        # frames (or similar)
378        self._send_method(spec.Basic.Get(queue=queue, no_ack=auto_ack))
379
380    def basic_nack(self, delivery_tag=None, multiple=False, requeue=True):
381        """This method allows a client to reject one or more incoming messages.
382        It can be used to interrupt and cancel large incoming messages, or
383        return untreatable messages to their original queue.
384
385        :param integer delivery-tag: int/long The server-assigned delivery tag
386        :param bool multiple: If set to True, the delivery tag is treated as
387                              "up to and including", so that multiple messages
388                              can be acknowledged with a single method. If set
389                              to False, the delivery tag refers to a single
390                              message. If the multiple field is 1, and the
391                              delivery tag is zero, this indicates
392                              acknowledgement of all outstanding messages.
393        :param bool requeue: If requeue is true, the server will attempt to
394                             requeue the message. If requeue is false or the
395                             requeue attempt fails the messages are discarded or
396                             dead-lettered.
397
398        """
399        self._raise_if_not_open()
400        return self._send_method(
401            spec.Basic.Nack(delivery_tag, multiple, requeue))
402
403    def basic_publish(self,
404                      exchange,
405                      routing_key,
406                      body,
407                      properties=None,
408                      mandatory=False):
409        """Publish to the channel with the given exchange, routing key and body.
410        For more information on basic_publish and what the parameters do, see:
411
412        http://www.rabbitmq.com/amqp-0-9-1-reference.html#basic.publish
413
414        :param str exchange: The exchange to publish to
415        :param str routing_key: The routing key to bind on
416        :param bytes body: The message body
417        :param pika.spec.BasicProperties properties: Basic.properties
418        :param bool mandatory: The mandatory flag
419
420        """
421        self._raise_if_not_open()
422        if isinstance(body, unicode_type):
423            body = body.encode('utf-8')
424        properties = properties or spec.BasicProperties()
425        self._send_method(
426            spec.Basic.Publish(exchange=exchange,
427                               routing_key=routing_key,
428                               mandatory=mandatory), (properties, body))
429
430    def basic_qos(self,
431                  prefetch_size=0,
432                  prefetch_count=0,
433                  global_qos=False,
434                  callback=None):
435        """Specify quality of service. This method requests a specific quality
436        of service. The QoS can be specified for the current channel or for all
437        channels on the connection. The client can request that messages be sent
438        in advance so that when the client finishes processing a message, the
439        following message is already held locally, rather than needing to be
440        sent down the channel. Prefetching gives a performance improvement.
441
442        :param int prefetch_size:  This field specifies the prefetch window
443                                   size. The server will send a message in
444                                   advance if it is equal to or smaller in size
445                                   than the available prefetch size (and also
446                                   falls into other prefetch limits). May be set
447                                   to zero, meaning "no specific limit",
448                                   although other prefetch limits may still
449                                   apply. The prefetch-size is ignored by
450                                   consumers who have enabled the no-ack option.
451        :param int prefetch_count: Specifies a prefetch window in terms of whole
452                                   messages. This field may be used in
453                                   combination with the prefetch-size field; a
454                                   message will only be sent in advance if both
455                                   prefetch windows (and those at the channel
456                                   and connection level) allow it. The
457                                   prefetch-count is ignored by consumers who
458                                   have enabled the no-ack option.
459        :param bool global_qos:    Should the QoS apply to all channels on the
460                                   connection.
461        :param callable callback: The callback to call for Basic.QosOk response
462        :raises ValueError:
463
464        """
465        self._raise_if_not_open()
466        validators.rpc_completion_callback(callback)
467        validators.zero_or_greater('prefetch_size', prefetch_size)
468        validators.zero_or_greater('prefetch_count', prefetch_count)
469        return self._rpc(
470            spec.Basic.Qos(prefetch_size, prefetch_count, global_qos), callback,
471            [spec.Basic.QosOk])
472
473    def basic_reject(self, delivery_tag, requeue=True):
474        """Reject an incoming message. This method allows a client to reject a
475        message. It can be used to interrupt and cancel large incoming messages,
476        or return untreatable messages to their original queue.
477
478        :param integer delivery-tag: int/long The server-assigned delivery tag
479        :param bool requeue: If requeue is true, the server will attempt to
480                             requeue the message. If requeue is false or the
481                             requeue attempt fails the messages are discarded or
482                             dead-lettered.
483        :raises: TypeError
484
485        """
486        self._raise_if_not_open()
487        if not is_integer(delivery_tag):
488            raise TypeError('delivery_tag must be an integer')
489        return self._send_method(spec.Basic.Reject(delivery_tag, requeue))
490
491    def basic_recover(self, requeue=False, callback=None):
492        """This method asks the server to redeliver all unacknowledged messages
493        on a specified channel. Zero or more messages may be redelivered. This
494        method replaces the asynchronous Recover.
495
496        :param bool requeue: If False, the message will be redelivered to the
497                             original recipient. If True, the server will
498                             attempt to requeue the message, potentially then
499                             delivering it to an alternative subscriber.
500        :param callable callback: Callback to call when receiving
501            Basic.RecoverOk
502        :param callable callback: callback(pika.frame.Method) for method
503            Basic.RecoverOk
504        :raises ValueError:
505
506        """
507        self._raise_if_not_open()
508        validators.rpc_completion_callback(callback)
509        return self._rpc(spec.Basic.Recover(requeue), callback,
510                         [spec.Basic.RecoverOk])
511
512    def close(self, reply_code=0, reply_text="Normal shutdown"):
513        """Invoke a graceful shutdown of the channel with the AMQP Broker.
514
515        If channel is OPENING, transition to CLOSING and suppress the incoming
516        Channel.OpenOk, if any.
517
518        :param int reply_code: The reason code to send to broker
519        :param str reply_text: The reason text to send to broker
520
521        :raises ChannelWrongStateError: if channel is closed or closing
522
523        """
524        if self.is_closed or self.is_closing:
525            # Whoever is calling `close` might expect the on-channel-close-cb
526            # to be called, which won't happen when it's already closed.
527            self._raise_if_not_open()
528
529        # If channel is OPENING, we will transition it to CLOSING state,
530        # causing the _on_openok method to suppress the OPEN state transition
531        # and the on-channel-open-callback
532
533        LOGGER.info('Closing channel (%s): %r on %s', reply_code, reply_text,
534                    self)
535
536        # Save the reason info so that we may use it in the '_on_channel_close'
537        # callback processing
538        self._closing_reason = exceptions.ChannelClosedByClient(
539            reply_code, reply_text)
540
541        for consumer_tag in dictkeys(self._consumers):
542            if consumer_tag not in self._cancelled:
543                self.basic_cancel(consumer_tag=consumer_tag)
544
545        # Change state after cancelling consumers to avoid
546        # ChannelWrongStateError exception from basic_cancel
547        self._set_state(self.CLOSING)
548
549        self._rpc(spec.Channel.Close(reply_code, reply_text, 0, 0),
550                  self._on_closeok, [spec.Channel.CloseOk])
551
552    def confirm_delivery(self, ack_nack_callback, callback=None):
553        """Turn on Confirm mode in the channel. Pass in a callback to be
554        notified by the Broker when a message has been confirmed as received or
555        rejected (Basic.Ack, Basic.Nack) from the broker to the publisher.
556
557        For more information see:
558            https://www.rabbitmq.com/confirms.html
559
560        :param callable ack_nack_callback: Required callback for delivery
561            confirmations that has the following signature:
562            callback(pika.frame.Method), where method_frame contains
563            either method `spec.Basic.Ack` or `spec.Basic.Nack`.
564        :param callable callback: callback(pika.frame.Method) for method
565            Confirm.SelectOk
566        :raises ValueError:
567
568        """
569        if not callable(ack_nack_callback):
570            # confirm_deliver requires a callback; it's meaningless
571            # without a user callback to receieve Basic.Ack/Basic.Nack notifications
572            raise ValueError('confirm_delivery requires a callback '
573                             'to receieve Basic.Ack/Basic.Nack notifications')
574
575        self._raise_if_not_open()
576        nowait = validators.rpc_completion_callback(callback)
577
578        if not (self.connection.publisher_confirms and
579                self.connection.basic_nack):
580            raise exceptions.MethodNotImplemented(
581                'Confirm.Select not Supported by Server')
582
583        # Add the ack and nack callback
584        self.callbacks.add(self.channel_number, spec.Basic.Ack,
585                           ack_nack_callback, False)
586        self.callbacks.add(self.channel_number, spec.Basic.Nack,
587                           ack_nack_callback, False)
588
589        self._rpc(spec.Confirm.Select(nowait), callback,
590                  [spec.Confirm.SelectOk] if not nowait else [])
591
592    @property
593    def consumer_tags(self):
594        """Property method that returns a list of currently active consumers
595
596        :rtype: list
597
598        """
599        return dictkeys(self._consumers)
600
601    def exchange_bind(self,
602                      destination,
603                      source,
604                      routing_key='',
605                      arguments=None,
606                      callback=None):
607        """Bind an exchange to another exchange.
608
609        :param str destination: The destination exchange to bind
610        :param str source: The source exchange to bind to
611        :param str routing_key: The routing key to bind on
612        :param dict arguments: Custom key/value pair arguments for the binding
613        :param callable callback: callback(pika.frame.Method) for method Exchange.BindOk
614        :raises ValueError:
615
616        """
617        self._raise_if_not_open()
618        validators.require_string(destination, 'destination')
619        validators.require_string(source, 'source')
620        nowait = validators.rpc_completion_callback(callback)
621        return self._rpc(
622            spec.Exchange.Bind(0, destination, source, routing_key, nowait,
623                               arguments or dict()), callback,
624            [spec.Exchange.BindOk] if not nowait else [])
625
626    def exchange_declare(self,
627                         exchange,
628                         exchange_type='direct',
629                         passive=False,
630                         durable=False,
631                         auto_delete=False,
632                         internal=False,
633                         arguments=None,
634                         callback=None):
635        """This method creates an exchange if it does not already exist, and if
636        the exchange exists, verifies that it is of the correct and expected
637        class.
638
639        If passive set, the server will reply with Declare-Ok if the exchange
640        already exists with the same name, and raise an error if not and if the
641        exchange does not already exist, the server MUST raise a channel
642        exception with reply code 404 (not found).
643
644        :param str exchange: The exchange name consists of a non-empty sequence
645            of these characters: letters, digits, hyphen, underscore, period,
646            or colon
647        :param str exchange_type: The exchange type to use
648        :param bool passive: Perform a declare or just check to see if it exists
649        :param bool durable: Survive a reboot of RabbitMQ
650        :param bool auto_delete: Remove when no more queues are bound to it
651        :param bool internal: Can only be published to by other exchanges
652        :param dict arguments: Custom key/value pair arguments for the exchange
653        :param callable callback: callback(pika.frame.Method) for method Exchange.DeclareOk
654        :raises ValueError:
655
656        """
657        validators.require_string(exchange, 'exchange')
658        self._raise_if_not_open()
659        nowait = validators.rpc_completion_callback(callback)
660        return self._rpc(
661            spec.Exchange.Declare(0, exchange, exchange_type, passive, durable,
662                                  auto_delete, internal, nowait, arguments or
663                                  dict()), callback,
664            [spec.Exchange.DeclareOk] if not nowait else [])
665
666    def exchange_delete(self, exchange=None, if_unused=False, callback=None):
667        """Delete the exchange.
668
669        :param str exchange: The exchange name
670        :param bool if_unused: only delete if the exchange is unused
671        :param callable callback: callback(pika.frame.Method) for method Exchange.DeleteOk
672        :raises ValueError:
673
674        """
675        self._raise_if_not_open()
676        nowait = validators.rpc_completion_callback(callback)
677        return self._rpc(spec.Exchange.Delete(0, exchange, if_unused,
678                                              nowait), callback,
679                         [spec.Exchange.DeleteOk] if not nowait else [])
680
681    def exchange_unbind(self,
682                        destination=None,
683                        source=None,
684                        routing_key='',
685                        arguments=None,
686                        callback=None):
687        """Unbind an exchange from another exchange.
688
689        :param str destination: The destination exchange to unbind
690        :param str source: The source exchange to unbind from
691        :param str routing_key: The routing key to unbind
692        :param dict arguments: Custom key/value pair arguments for the binding
693        :param callable callback: callback(pika.frame.Method) for method Exchange.UnbindOk
694        :raises ValueError:
695
696        """
697        self._raise_if_not_open()
698        nowait = validators.rpc_completion_callback(callback)
699        return self._rpc(
700            spec.Exchange.Unbind(0, destination, source, routing_key, nowait,
701                                 arguments), callback,
702            [spec.Exchange.UnbindOk] if not nowait else [])
703
704    def flow(self, active, callback=None):
705        """Turn Channel flow control off and on. Pass a callback to be notified
706        of the response from the server. active is a bool. Callback should
707        expect a bool in response indicating channel flow state. For more
708        information, please reference:
709
710        http://www.rabbitmq.com/amqp-0-9-1-reference.html#channel.flow
711
712        :param bool active: Turn flow on or off
713        :param callable callback: callback(bool) upon completion
714        :raises ValueError:
715
716        """
717        self._raise_if_not_open()
718        validators.rpc_completion_callback(callback)
719        self._on_flowok_callback = callback
720        self._rpc(spec.Channel.Flow(active), self._on_flowok,
721                  [spec.Channel.FlowOk])
722
723    @property
724    def is_closed(self):
725        """Returns True if the channel is closed.
726
727        :rtype: bool
728
729        """
730        return self._state == self.CLOSED
731
732    @property
733    def is_closing(self):
734        """Returns True if client-initiated closing of the channel is in
735        progress.
736
737        :rtype: bool
738
739        """
740        return self._state == self.CLOSING
741
742    @property
743    def is_open(self):
744        """Returns True if the channel is open.
745
746        :rtype: bool
747
748        """
749        return self._state == self.OPEN
750
751    def open(self):
752        """Open the channel"""
753        self._set_state(self.OPENING)
754        self._add_callbacks()
755        self._rpc(spec.Channel.Open(), self._on_openok, [spec.Channel.OpenOk])
756
757    def queue_bind(self,
758                   queue,
759                   exchange,
760                   routing_key=None,
761                   arguments=None,
762                   callback=None):
763        """Bind the queue to the specified exchange
764
765        :param str queue: The queue to bind to the exchange
766        :param str exchange: The source exchange to bind to
767        :param str routing_key: The routing key to bind on
768        :param dict arguments: Custom key/value pair arguments for the binding
769        :param callable callback: callback(pika.frame.Method) for method Queue.BindOk
770        :raises ValueError:
771
772        """
773        validators.require_string(queue, 'queue')
774        validators.require_string(exchange, 'exchange')
775        self._raise_if_not_open()
776        nowait = validators.rpc_completion_callback(callback)
777        if routing_key is None:
778            routing_key = queue
779        return self._rpc(
780            spec.Queue.Bind(0, queue, exchange, routing_key, nowait,
781                            arguments or dict()), callback,
782            [spec.Queue.BindOk] if not nowait else [])
783
784    def queue_declare(self,
785                      queue,
786                      passive=False,
787                      durable=False,
788                      exclusive=False,
789                      auto_delete=False,
790                      arguments=None,
791                      callback=None):
792        """Declare queue, create if needed. This method creates or checks a
793        queue. When creating a new queue the client can specify various
794        properties that control the durability of the queue and its contents,
795        and the level of sharing for the queue.
796
797        Use an empty string as the queue name for the broker to auto-generate
798        one
799
800        :param str queue: The queue name; if empty string, the broker will
801            create a unique queue name
802        :param bool passive: Only check to see if the queue exists
803        :param bool durable: Survive reboots of the broker
804        :param bool exclusive: Only allow access by the current connection
805        :param bool auto_delete: Delete after consumer cancels or disconnects
806        :param dict arguments: Custom key/value arguments for the queue
807        :param callable callback: callback(pika.frame.Method) for method Queue.DeclareOk
808        :raises ValueError:
809
810        """
811        validators.require_string(queue, 'queue')
812        self._raise_if_not_open()
813        nowait = validators.rpc_completion_callback(callback)
814
815        if queue:
816            condition = (spec.Queue.DeclareOk, {'queue': queue})
817        else:
818            condition = spec.Queue.DeclareOk
819        replies = [condition] if not nowait else []
820
821        return self._rpc(
822            spec.Queue.Declare(0, queue, passive, durable, exclusive,
823                               auto_delete, nowait, arguments or dict()),
824            callback, replies)
825
826    def queue_delete(self,
827                     queue,
828                     if_unused=False,
829                     if_empty=False,
830                     callback=None):
831        """Delete a queue from the broker.
832
833        :param str queue: The queue to delete
834        :param bool if_unused: only delete if it's unused
835        :param bool if_empty: only delete if the queue is empty
836        :param callable callback: callback(pika.frame.Method) for method Queue.DeleteOk
837        :raises ValueError:
838
839        """
840        self._raise_if_not_open()
841        validators.require_string(queue, 'queue')
842        nowait = validators.rpc_completion_callback(callback)
843        replies = [spec.Queue.DeleteOk] if not nowait else []
844        return self._rpc(
845            spec.Queue.Delete(0, queue, if_unused, if_empty, nowait), callback,
846            replies)
847
848    def queue_purge(self, queue, callback=None):
849        """Purge all of the messages from the specified queue
850
851        :param str queue: The queue to purge
852        :param callable callback: callback(pika.frame.Method) for method Queue.PurgeOk
853        :raises ValueError:
854
855        """
856        self._raise_if_not_open()
857        validators.require_string(queue, 'queue')
858        nowait = validators.rpc_completion_callback(callback)
859        replies = [spec.Queue.PurgeOk] if not nowait else []
860        return self._rpc(spec.Queue.Purge(0, queue, nowait), callback, replies)
861
862    def queue_unbind(self,
863                     queue,
864                     exchange=None,
865                     routing_key=None,
866                     arguments=None,
867                     callback=None):
868        """Unbind a queue from an exchange.
869
870        :param str queue: The queue to unbind from the exchange
871        :param str exchange: The source exchange to bind from
872        :param str routing_key: The routing key to unbind
873        :param dict arguments: Custom key/value pair arguments for the binding
874        :param callable callback: callback(pika.frame.Method) for method Queue.UnbindOk
875        :raises ValueError:
876
877        """
878        self._raise_if_not_open()
879        validators.require_string(queue, 'queue')
880        validators.rpc_completion_callback(callback)
881        if routing_key is None:
882            routing_key = queue
883        return self._rpc(
884            spec.Queue.Unbind(0, queue, exchange, routing_key, arguments or
885                              dict()), callback, [spec.Queue.UnbindOk])
886
887    def tx_commit(self, callback=None):
888        """Commit a transaction
889
890        :param callable callback: The callback for delivery confirmations
891        :raises ValueError:
892
893        """
894        self._raise_if_not_open()
895        validators.rpc_completion_callback(callback)
896        return self._rpc(spec.Tx.Commit(), callback, [spec.Tx.CommitOk])
897
898    def tx_rollback(self, callback=None):
899        """Rollback a transaction.
900
901        :param callable callback: The callback for delivery confirmations
902        :raises ValueError:
903
904        """
905        self._raise_if_not_open()
906        validators.rpc_completion_callback(callback)
907        return self._rpc(spec.Tx.Rollback(), callback, [spec.Tx.RollbackOk])
908
909    def tx_select(self, callback=None):
910        """Select standard transaction mode. This method sets the channel to use
911        standard transactions. The client must use this method at least once on
912        a channel before using the Commit or Rollback methods.
913
914        :param callable callback: The callback for delivery confirmations
915        :raises ValueError:
916
917        """
918        self._raise_if_not_open()
919        validators.rpc_completion_callback(callback)
920        return self._rpc(spec.Tx.Select(), callback, [spec.Tx.SelectOk])
921
922    # Internal methods
923
924    def _add_callbacks(self):
925        """Callbacks that add the required behavior for a channel when
926        connecting and connected to a server.
927
928        """
929        # Add a callback for Basic.GetEmpty
930        self.callbacks.add(self.channel_number, spec.Basic.GetEmpty,
931                           self._on_getempty, False)
932
933        # Add a callback for Basic.Cancel
934        self.callbacks.add(self.channel_number, spec.Basic.Cancel,
935                           self._on_cancel, False)
936
937        # Deprecated in newer versions of RabbitMQ but still register for it
938        self.callbacks.add(self.channel_number, spec.Channel.Flow,
939                           self._on_flow, False)
940
941        # Add a callback for when the server closes our channel
942        self.callbacks.add(self.channel_number, spec.Channel.Close,
943                           self._on_close_from_broker, True)
944
945    def _add_on_cleanup_callback(self, callback):
946        """For internal use only (e.g., Connection needs to remove closed
947        channels from its channel container). Pass a callback function that will
948        be called when the channel is being cleaned up after all channel-close
949        callbacks callbacks.
950
951        :param callable callback: The callback to call, having the
952            signature: callback(channel)
953
954        """
955        self.callbacks.add(self.channel_number,
956                           self._ON_CHANNEL_CLEANUP_CB_KEY,
957                           callback,
958                           one_shot=True,
959                           only_caller=self)
960
961    def _cleanup(self):
962        """Remove all consumers and any callbacks for the channel."""
963        self.callbacks.process(self.channel_number,
964                               self._ON_CHANNEL_CLEANUP_CB_KEY, self, self)
965        self._consumers = dict()
966        self.callbacks.cleanup(str(self.channel_number))
967        self._cookie = None
968
969    def _cleanup_consumer_ref(self, consumer_tag):
970        """Remove any references to the consumer tag in internal structures
971        for consumer state.
972
973        :param str consumer_tag: The consumer tag to cleanup
974
975        """
976        self._consumers_with_noack.discard(consumer_tag)
977        self._consumers.pop(consumer_tag, None)
978        self._cancelled.discard(consumer_tag)
979
980    def _get_cookie(self):
981        """Used by the wrapper implementation (e.g., `BlockingChannel`) to
982        retrieve the cookie that it set via `_set_cookie`
983
984        :returns: opaque cookie value that was set via `_set_cookie`
985        :rtype: object
986
987        """
988        return self._cookie
989
990    def _handle_content_frame(self, frame_value):
991        """This is invoked by the connection when frames that are not registered
992        with the CallbackManager have been found. This should only be the case
993        when the frames are related to content delivery.
994
995        The _content_assembler will be invoked which will return the fully
996        formed message in three parts when all of the body frames have been
997        received.
998
999        :param pika.amqp_object.Frame frame_value: The frame to deliver
1000
1001        """
1002        try:
1003            response = self._content_assembler.process(frame_value)
1004        except exceptions.UnexpectedFrameError:
1005            self._on_unexpected_frame(frame_value)
1006            return
1007
1008        if response:
1009            if isinstance(response[0].method, spec.Basic.Deliver):
1010                self._on_deliver(*response)
1011            elif isinstance(response[0].method, spec.Basic.GetOk):
1012                self._on_getok(*response)
1013            elif isinstance(response[0].method, spec.Basic.Return):
1014                self._on_return(*response)
1015
1016    def _on_cancel(self, method_frame):
1017        """When the broker cancels a consumer, delete it from our internal
1018        dictionary.
1019
1020        :param pika.frame.Method method_frame: The method frame received
1021
1022        """
1023        if method_frame.method.consumer_tag in self._cancelled:
1024            # User-initiated cancel is waiting for Cancel-ok
1025            return
1026
1027        self._cleanup_consumer_ref(method_frame.method.consumer_tag)
1028
1029    def _on_cancelok(self, method_frame):
1030        """Called in response to a frame from the Broker when the
1031         client sends Basic.Cancel
1032
1033        :param pika.frame.Method method_frame: The method frame received
1034
1035        """
1036        self._cleanup_consumer_ref(method_frame.method.consumer_tag)
1037
1038    def _transition_to_closed(self):
1039        """Common logic for transitioning the channel to the CLOSED state:
1040
1041        Set state to CLOSED, dispatch callbacks registered via
1042        `Channel.add_on_close_callback()`, and mop up.
1043
1044        Assumes that the channel is not in CLOSED state and that
1045        `self._closing_reason` has been set up
1046
1047        """
1048        assert not self.is_closed
1049        assert self._closing_reason is not None
1050
1051        self._set_state(self.CLOSED)
1052
1053        try:
1054            self.callbacks.process(self.channel_number, '_on_channel_close',
1055                                   self, self, self._closing_reason)
1056        finally:
1057            self._cleanup()
1058
1059    def _on_close_from_broker(self, method_frame):
1060        """Handle `Channel.Close` from broker.
1061
1062        :param pika.frame.Method method_frame: Method frame with Channel.Close
1063            method
1064
1065        """
1066        LOGGER.warning('Received remote Channel.Close (%s): %r on %s',
1067                       method_frame.method.reply_code,
1068                       method_frame.method.reply_text, self)
1069        # Note, we should not be called when channel is already closed
1070        assert not self.is_closed
1071
1072        # AMQP 0.9.1 requires CloseOk response to Channel.Close;
1073        self._send_method(spec.Channel.CloseOk())
1074
1075        # Save the details, possibly overriding user-provided values if
1076        # user-initiated close is pending (in which case they will be provided
1077        # to user callback when CloseOk arrives).
1078        self._closing_reason = exceptions.ChannelClosedByBroker(
1079            method_frame.method.reply_code, method_frame.method.reply_text)
1080
1081        if self.is_closing:
1082            # Since we may have already put Channel.Close on the wire, we need
1083            # to wait for CloseOk before cleaning up to avoid a race condition
1084            # whereby our channel number might get reused before our CloseOk
1085            # arrives
1086            #
1087            # NOTE: if our Channel.Close destined for the broker was blocked by
1088            # an earlier synchronous method, this call will drop it and perform
1089            # a meta-close (see `_on_close_meta()` which fakes receipt of
1090            # `Channel.CloseOk` and dispatches the `'_on_channel_close'`
1091            # callbacks.
1092            self._drain_blocked_methods_on_remote_close()
1093        else:
1094            self._transition_to_closed()
1095
1096    def _on_close_meta(self, reason):
1097        """Handle meta-close request from either a remote Channel.Close from
1098        the broker (when a pending Channel.Close method is queued for
1099        execution) or a Connection's cleanup logic after sudden connection
1100        loss. We use this opportunity to transition to CLOSED state, clean up
1101        the channel, and dispatch the on-channel-closed callbacks.
1102
1103        :param Exception reason: Exception describing the reason for closing.
1104
1105        """
1106        LOGGER.debug('Handling meta-close on %s: %r', self, reason)
1107
1108        if not self.is_closed:
1109            self._closing_reason = reason
1110            self._transition_to_closed()
1111
1112    def _on_closeok(self, method_frame):
1113        """Invoked when RabbitMQ replies to a Channel.Close method
1114
1115        :param pika.frame.Method method_frame: Method frame with Channel.CloseOk
1116            method
1117
1118        """
1119        LOGGER.info('Received %s on %s', method_frame.method, self)
1120
1121        self._transition_to_closed()
1122
1123    def _on_deliver(self, method_frame, header_frame, body):
1124        """Cope with reentrancy. If a particular consumer is still active when
1125        another delivery appears for it, queue the deliveries up until it
1126        finally exits.
1127
1128        :param pika.frame.Method method_frame: The method frame received
1129        :param pika.frame.Header header_frame: The header frame received
1130        :param bytes body: The body received
1131
1132        """
1133        consumer_tag = method_frame.method.consumer_tag
1134
1135        if consumer_tag in self._cancelled:
1136            if self.is_open and consumer_tag not in self._consumers_with_noack:
1137                self.basic_reject(method_frame.method.delivery_tag)
1138            return
1139
1140        if consumer_tag not in self._consumers:
1141            LOGGER.error('Unexpected delivery: %r', method_frame)
1142            return
1143
1144        self._consumers[consumer_tag](self, method_frame.method,
1145                                      header_frame.properties, body)
1146
1147    def _on_eventok(self, method_frame):
1148        """Generic events that returned ok that may have internal callbacks.
1149        We keep a list of what we've yet to implement so that we don't silently
1150        drain events that we don't support.
1151
1152        :param pika.frame.Method method_frame: The method frame received
1153
1154        """
1155        LOGGER.debug('Discarding frame %r', method_frame)
1156
1157    def _on_flow(self, _method_frame_unused):
1158        """Called if the server sends a Channel.Flow frame.
1159
1160        :param pika.frame.Method method_frame_unused: The Channel.Flow frame
1161
1162        """
1163        if self._has_on_flow_callback is False:
1164            LOGGER.warning('Channel.Flow received from server')
1165
1166    def _on_flowok(self, method_frame):
1167        """Called in response to us asking the server to toggle on Channel.Flow
1168
1169        :param pika.frame.Method method_frame: The method frame received
1170
1171        """
1172        self.flow_active = method_frame.method.active
1173        if self._on_flowok_callback:
1174            self._on_flowok_callback(method_frame.method.active)
1175            self._on_flowok_callback = None
1176        else:
1177            LOGGER.warning('Channel.FlowOk received with no active callbacks')
1178
1179    def _on_getempty(self, method_frame):
1180        """When we receive an empty reply do nothing but log it
1181
1182        :param pika.frame.Method method_frame: The method frame received
1183
1184        """
1185        LOGGER.debug('Received Basic.GetEmpty: %r', method_frame)
1186        if self._on_getok_callback is not None:
1187            self._on_getok_callback = None
1188
1189    def _on_getok(self, method_frame, header_frame, body):
1190        """Called in reply to a Basic.Get when there is a message.
1191
1192        :param pika.frame.Method method_frame: The method frame received
1193        :param pika.frame.Header header_frame: The header frame received
1194        :param bytes body: The body received
1195
1196        """
1197        if self._on_getok_callback is not None:
1198            callback = self._on_getok_callback
1199            self._on_getok_callback = None
1200            callback(self, method_frame.method, header_frame.properties, body)
1201        else:
1202            LOGGER.error('Basic.GetOk received with no active callback')
1203
1204    def _on_openok(self, method_frame):
1205        """Called by our callback handler when we receive a Channel.OpenOk and
1206        subsequently calls our _on_openok_callback which was passed into the
1207        Channel constructor. The reason we do this is because we want to make
1208        sure that the on_open_callback parameter passed into the Channel
1209        constructor is not the first callback we make.
1210
1211        Suppress the state transition and callback if channel is already in
1212        CLOSING state.
1213
1214        :param pika.frame.Method method_frame: Channel.OpenOk frame
1215
1216        """
1217        # Suppress OpenOk if the user or Connection.Close started closing it
1218        # before open completed.
1219        if self.is_closing:
1220            LOGGER.debug('Suppressing while in closing state: %s', method_frame)
1221        else:
1222            self._set_state(self.OPEN)
1223            if self._on_openok_callback is not None:
1224                self._on_openok_callback(self)
1225
1226    def _on_return(self, method_frame, header_frame, body):
1227        """Called if the server sends a Basic.Return frame.
1228
1229        :param pika.frame.Method method_frame: The Basic.Return frame
1230        :param pika.frame.Header header_frame: The content header frame
1231        :param bytes body: The message body
1232
1233        """
1234        if not self.callbacks.process(self.channel_number, '_on_return', self,
1235                                      self, method_frame.method,
1236                                      header_frame.properties, body):
1237            LOGGER.warning('Basic.Return received from server (%r, %r)',
1238                           method_frame.method, header_frame.properties)
1239
1240    def _on_selectok(self, method_frame):
1241        """Called when the broker sends a Confirm.SelectOk frame
1242
1243        :param pika.frame.Method method_frame: The method frame received
1244
1245        """
1246        LOGGER.debug("Confirm.SelectOk Received: %r", method_frame)
1247
1248    def _on_synchronous_complete(self, _method_frame_unused):
1249        """This is called when a synchronous command is completed. It will undo
1250        the blocking state and send all the frames that stacked up while we
1251        were in the blocking state.
1252
1253        :param pika.frame.Method method_frame_unused: The method frame received
1254
1255        """
1256        LOGGER.debug('%i blocked frames', len(self._blocked))
1257        self._blocking = None
1258        # self._blocking must be checked here as a callback could
1259        # potentially change the state of that variable during an
1260        # iteration of the while loop
1261        while self._blocked and self._blocking is None:
1262            self._rpc(*self._blocked.popleft())
1263
1264    def _drain_blocked_methods_on_remote_close(self):
1265        """This is called when the broker sends a Channel.Close while the
1266        client is in CLOSING state. This method checks the blocked method
1267        queue for a pending client-initiated Channel.Close method and
1268        ensures its callbacks are processed, but does not send the method
1269        to the broker.
1270
1271        The broker may close the channel before responding to outstanding
1272        in-transit synchronous methods, or even before these methods have been
1273        sent to the broker. AMQP 0.9.1 obliges the server to drop all methods
1274        arriving on a closed channel other than Channel.CloseOk and
1275        Channel.Close. Since the response to a synchronous method that blocked
1276        the channel never arrives, the channel never becomes unblocked, and the
1277        Channel.Close, if any, in the blocked queue has no opportunity to be
1278        sent, and thus its completion callback would never be called.
1279
1280        """
1281        LOGGER.debug(
1282            'Draining %i blocked frames due to broker-requested Channel.Close',
1283            len(self._blocked))
1284        while self._blocked:
1285            method = self._blocked.popleft()[0]
1286            if isinstance(method, spec.Channel.Close):
1287                # The desired reason is already in self._closing_reason
1288                self._on_close_meta(self._closing_reason)
1289            else:
1290                LOGGER.debug('Ignoring drained blocked method: %s', method)
1291
1292    def _rpc(self, method, callback=None, acceptable_replies=None):
1293        """Make a syncronous channel RPC call for a synchronous method frame. If
1294        the channel is already in the blocking state, then enqueue the request,
1295        but don't send it at this time; it will be eventually sent by
1296        `_on_synchronous_complete` after the prior blocking request receives a
1297        response. If the channel is not in the blocking state and
1298        `acceptable_replies` is not empty, transition the channel to the
1299        blocking state and register for `_on_synchronous_complete` before
1300        sending the request.
1301
1302        NOTE: A callback must be accompanied by non-empty acceptable_replies.
1303
1304        :param pika.amqp_object.Method method: The AMQP method to invoke
1305        :param callable callback: The callback for the RPC response
1306        :param list|None acceptable_replies: A (possibly empty) sequence of
1307            replies this RPC call expects or None
1308
1309        """
1310        assert method.synchronous, (
1311            'Only synchronous-capable methods may be used with _rpc: %r' %
1312            (method,))
1313
1314        # Validate we got None or a list of acceptable_replies
1315        if not isinstance(acceptable_replies, (type(None), list)):
1316            raise TypeError('acceptable_replies should be list or None')
1317
1318        if callback is not None:
1319            # Validate the callback is callable
1320            if not callable(callback):
1321                raise TypeError('callback should be None or a callable')
1322
1323            # Make sure that callback is accompanied by acceptable replies
1324            if not acceptable_replies:
1325                raise ValueError(
1326                    'Unexpected callback for asynchronous (nowait) operation.')
1327
1328        # Make sure the channel is not closed yet
1329        if self.is_closed:
1330            self._raise_if_not_open()
1331
1332        # If the channel is blocking, add subsequent commands to our stack
1333        if self._blocking:
1334            LOGGER.debug(
1335                'Already in blocking state, so enqueueing method %s; '
1336                'acceptable_replies=%r', method, acceptable_replies)
1337            self._blocked.append([method, callback, acceptable_replies])
1338            return
1339
1340        # Note: _send_method can throw exceptions if there are framing errors
1341        # or invalid data passed in. Call it here to prevent self._blocking
1342        # from being set if an exception is thrown. This also prevents
1343        # acceptable_replies registering callbacks when exceptions are thrown
1344        self._send_method(method)
1345
1346        # If acceptable replies are set, add callbacks
1347        if acceptable_replies:
1348            # Block until a response frame is received for synchronous frames
1349            self._blocking = method.NAME
1350            LOGGER.debug(
1351                'Entering blocking state on frame %s; acceptable_replies=%r',
1352                method, acceptable_replies)
1353
1354            for reply in acceptable_replies:
1355                if isinstance(reply, tuple):
1356                    reply, arguments = reply
1357                else:
1358                    arguments = None
1359                LOGGER.debug('Adding on_synchronous_complete callback')
1360                self.callbacks.add(self.channel_number,
1361                                   reply,
1362                                   self._on_synchronous_complete,
1363                                   arguments=arguments)
1364                if callback is not None:
1365                    LOGGER.debug('Adding passed-in RPC response callback')
1366                    self.callbacks.add(self.channel_number,
1367                                       reply,
1368                                       callback,
1369                                       arguments=arguments)
1370
1371    def _raise_if_not_open(self):
1372        """If channel is not in the OPEN state, raises ChannelWrongStateError
1373        with `reply_code` and `reply_text` corresponding to current state.
1374
1375        :raises exceptions.ChannelWrongStateError: if channel is not in OPEN
1376            state.
1377        """
1378        if self._state == self.OPEN:
1379            return
1380
1381        if self._state == self.OPENING:
1382            raise exceptions.ChannelWrongStateError('Channel is opening, but is not usable yet.')
1383
1384        if self._state == self.CLOSING:
1385            raise exceptions.ChannelWrongStateError('Channel is closing.')
1386
1387        # Assumed self.CLOSED
1388        assert self._state == self.CLOSED
1389        raise exceptions.ChannelWrongStateError('Channel is closed.')
1390
1391    def _send_method(self, method, content=None):
1392        """Shortcut wrapper to send a method through our connection, passing in
1393        the channel number
1394
1395        :param pika.amqp_object.Method method: The method to send
1396        :param tuple content: If set, is a content frame, is tuple of
1397                              properties and body.
1398
1399        """
1400        # pylint: disable=W0212
1401        self.connection._send_method(self.channel_number, method, content)
1402
1403    def _set_cookie(self, cookie):
1404        """Used by wrapper layer (e.g., `BlockingConnection`) to link the
1405        channel implementation back to the proxy. See `_get_cookie`.
1406
1407        :param cookie: an opaque value; typically a proxy channel implementation
1408            instance (e.g., `BlockingChannel` instance)
1409        """
1410        self._cookie = cookie
1411
1412    def _set_state(self, connection_state):
1413        """Set the channel connection state to the specified state value.
1414
1415        :param int connection_state: The connection_state value
1416
1417        """
1418        self._state = connection_state
1419
1420    def _on_unexpected_frame(self, frame_value):
1421        """Invoked when a frame is received that is not setup to be processed.
1422
1423        :param pika.frame.Frame frame_value: The frame received
1424
1425        """
1426        LOGGER.error('Unexpected frame: %r', frame_value)
1427
1428
1429class ContentFrameAssembler(object):
1430    """Handle content related frames, building a message and return the message
1431    back in three parts upon receipt.
1432
1433    """
1434
1435    def __init__(self):
1436        """Create a new instance of the conent frame assembler.
1437
1438        """
1439        self._method_frame = None
1440        self._header_frame = None
1441        self._seen_so_far = 0
1442        self._body_fragments = list()
1443
1444    def process(self, frame_value):
1445        """Invoked by the Channel object when passed frames that are not
1446        setup in the rpc process and that don't have explicit reply types
1447        defined. This includes Basic.Publish, Basic.GetOk and Basic.Return
1448
1449        :param Method|Header|Body frame_value: The frame to process
1450
1451        """
1452        if (isinstance(frame_value, frame.Method) and
1453                spec.has_content(frame_value.method.INDEX)):
1454            self._method_frame = frame_value
1455            return None
1456        elif isinstance(frame_value, frame.Header):
1457            self._header_frame = frame_value
1458            if frame_value.body_size == 0:
1459                return self._finish()
1460            else:
1461                return None
1462        elif isinstance(frame_value, frame.Body):
1463            return self._handle_body_frame(frame_value)
1464        else:
1465            raise exceptions.UnexpectedFrameError(frame_value)
1466
1467    def _finish(self):
1468        """Invoked when all of the message has been received
1469
1470        :rtype: tuple(pika.frame.Method, pika.frame.Header, str)
1471
1472        """
1473        content = (self._method_frame, self._header_frame,
1474                   b''.join(self._body_fragments))
1475        self._reset()
1476        return content
1477
1478    def _handle_body_frame(self, body_frame):
1479        """Receive body frames and append them to the stack. When the body size
1480        matches, call the finish method.
1481
1482        :param Body body_frame: The body frame
1483        :raises: pika.exceptions.BodyTooLongError
1484        :rtype: tuple(pika.frame.Method, pika.frame.Header, str)|None
1485
1486        """
1487        self._seen_so_far += len(body_frame.fragment)
1488        self._body_fragments.append(body_frame.fragment)
1489        if self._seen_so_far == self._header_frame.body_size:
1490            return self._finish()
1491        elif self._seen_so_far > self._header_frame.body_size:
1492            raise exceptions.BodyTooLongError(self._seen_so_far,
1493                                              self._header_frame.body_size)
1494        return None
1495
1496    def _reset(self):
1497        """Reset the values for processing frames"""
1498        self._method_frame = None
1499        self._header_frame = None
1500        self._seen_so_far = 0
1501        self._body_fragments = list()
1502