1"""The Channel class provides a wrapper for interacting with RabbitMQ 2implementing the methods and behaviors for an AMQP Channel. 3 4""" 5# disable too-many-lines 6# pylint: disable=C0302 7 8import collections 9import logging 10import uuid 11 12import pika.frame as frame 13import pika.exceptions as exceptions 14import pika.spec as spec 15import pika.validators as validators 16from pika.compat import unicode_type, dictkeys, is_integer 17 18LOGGER = logging.getLogger(__name__) 19 20MAX_CHANNELS = 65535 # per AMQP 0.9.1 spec. 21 22 23class Channel(object): 24 """A Channel is the primary communication method for interacting with 25 RabbitMQ. It is recommended that you do not directly invoke the creation of 26 a channel object in your application code but rather construct a channel by 27 calling the active connection's channel() method. 28 29 """ 30 31 # Disable pylint messages concerning "method could be a function" 32 # pylint: disable=R0201 33 34 CLOSED = 0 35 OPENING = 1 36 OPEN = 2 37 CLOSING = 3 # client-initiated close in progress 38 39 _STATE_NAMES = { 40 CLOSED: 'CLOSED', 41 OPENING: 'OPENING', 42 OPEN: 'OPEN', 43 CLOSING: 'CLOSING' 44 } 45 46 _ON_CHANNEL_CLEANUP_CB_KEY = '_on_channel_cleanup' 47 48 def __init__(self, connection, channel_number, on_open_callback): 49 """Create a new instance of the Channel 50 51 :param pika.connection.Connection connection: The connection 52 :param int channel_number: The channel number for this instance 53 :param callable on_open_callback: The callback to call on channel open. 54 The callback will be invoked with the `Channel` instance as its only 55 argument. 56 57 """ 58 if not isinstance(channel_number, int): 59 raise exceptions.InvalidChannelNumber(channel_number) 60 61 validators.rpc_completion_callback(on_open_callback) 62 63 self.channel_number = channel_number 64 self.callbacks = connection.callbacks 65 self.connection = connection 66 67 # Initially, flow is assumed to be active 68 self.flow_active = True 69 70 self._content_assembler = ContentFrameAssembler() 71 72 self._blocked = collections.deque(list()) 73 self._blocking = None 74 self._has_on_flow_callback = False 75 self._cancelled = set() 76 self._consumers = dict() 77 self._consumers_with_noack = set() 78 self._on_flowok_callback = None 79 self._on_getok_callback = None 80 self._on_openok_callback = on_open_callback 81 self._state = self.CLOSED 82 83 # We save the closing reason exception to be passed to on-channel-close 84 # callback at closing of the channel. Exception representing the closing 85 # reason; ChannelClosedByClient or ChannelClosedByBroker on controlled 86 # close; otherwise another exception describing the reason for failure 87 # (most likely connection failure). 88 self._closing_reason = None # type: None | Exception 89 90 # opaque cookie value set by wrapper layer (e.g., BlockingConnection) 91 # via _set_cookie 92 self._cookie = None 93 94 def __int__(self): 95 """Return the channel object as its channel number 96 97 :rtype: int 98 99 """ 100 return self.channel_number 101 102 def __repr__(self): 103 return '<%s number=%s %s conn=%r>' % ( 104 self.__class__.__name__, self.channel_number, 105 self._STATE_NAMES[self._state], self.connection) 106 107 def add_callback(self, callback, replies, one_shot=True): 108 """Pass in a callback handler and a list replies from the 109 RabbitMQ broker which you'd like the callback notified of. Callbacks 110 should allow for the frame parameter to be passed in. 111 112 :param callable callback: The callback to call 113 :param list replies: The replies to get a callback for 114 :param bool one_shot: Only handle the first type callback 115 116 """ 117 for reply in replies: 118 self.callbacks.add(self.channel_number, reply, callback, one_shot) 119 120 def add_on_cancel_callback(self, callback): 121 """Pass a callback function that will be called when the basic_cancel 122 is sent by the server. The callback function should receive a frame 123 parameter. 124 125 :param callable callback: The callback to call on Basic.Cancel from 126 broker 127 128 """ 129 self.callbacks.add(self.channel_number, spec.Basic.Cancel, callback, 130 False) 131 132 def add_on_close_callback(self, callback): 133 """Pass a callback function that will be called when the channel is 134 closed. The callback function will receive the channel and an exception 135 describing why the channel was closed. 136 137 If the channel is closed by broker via Channel.Close, the callback will 138 receive `ChannelClosedByBroker` as the reason. 139 140 If graceful user-initiated channel closing completes successfully ( 141 either directly of indirectly by closing a connection containing the 142 channel) and closing concludes gracefully without Channel.Close from the 143 broker and without loss of connection, the callback will receive 144 `ChannelClosedByClient` exception as reason. 145 146 If channel was closed due to loss of connection, the callback will 147 receive another exception type describing the failure. 148 149 :param callable callback: The callback, having the signature: 150 callback(Channel, Exception reason) 151 152 """ 153 self.callbacks.add(self.channel_number, '_on_channel_close', callback, 154 False, self) 155 156 def add_on_flow_callback(self, callback): 157 """Pass a callback function that will be called when Channel.Flow is 158 called by the remote server. Note that newer versions of RabbitMQ 159 will not issue this but instead use TCP backpressure 160 161 :param callable callback: The callback function 162 163 """ 164 self._has_on_flow_callback = True 165 self.callbacks.add(self.channel_number, spec.Channel.Flow, callback, 166 False) 167 168 def add_on_return_callback(self, callback): 169 """Pass a callback function that will be called when basic_publish is 170 sent a message that has been rejected and returned by the server. 171 172 :param callable callback: The function to call, having the signature 173 callback(channel, method, properties, body) 174 where 175 channel: pika.Channel 176 method: pika.spec.Basic.Return 177 properties: pika.spec.BasicProperties 178 body: bytes 179 180 """ 181 self.callbacks.add(self.channel_number, '_on_return', callback, False) 182 183 def basic_ack(self, delivery_tag=0, multiple=False): 184 """Acknowledge one or more messages. When sent by the client, this 185 method acknowledges one or more messages delivered via the Deliver or 186 Get-Ok methods. When sent by server, this method acknowledges one or 187 more messages published with the Publish method on a channel in 188 confirm mode. The acknowledgement can be for a single message or a 189 set of messages up to and including a specific message. 190 191 :param integer delivery_tag: int/long The server-assigned delivery tag 192 :param bool multiple: If set to True, the delivery tag is treated as 193 "up to and including", so that multiple messages 194 can be acknowledged with a single method. If set 195 to False, the delivery tag refers to a single 196 message. If the multiple field is 1, and the 197 delivery tag is zero, this indicates 198 acknowledgement of all outstanding messages. 199 200 """ 201 self._raise_if_not_open() 202 return self._send_method(spec.Basic.Ack(delivery_tag, multiple)) 203 204 def basic_cancel(self, consumer_tag='', callback=None): 205 """This method cancels a consumer. This does not affect already 206 delivered messages, but it does mean the server will not send any more 207 messages for that consumer. The client may receive an arbitrary number 208 of messages in between sending the cancel method and receiving the 209 cancel-ok reply. It may also be sent from the server to the client in 210 the event of the consumer being unexpectedly cancelled (i.e. cancelled 211 for any reason other than the server receiving the corresponding 212 basic.cancel from the client). This allows clients to be notified of 213 the loss of consumers due to events such as queue deletion. 214 215 :param str consumer_tag: Identifier for the consumer 216 :param callable callback: callback(pika.frame.Method) for method 217 Basic.CancelOk. If None, do not expect a Basic.CancelOk response, 218 otherwise, callback must be callable 219 220 :raises ValueError: 221 222 """ 223 validators.require_string(consumer_tag, 'consumer_tag') 224 self._raise_if_not_open() 225 nowait = validators.rpc_completion_callback(callback) 226 227 if consumer_tag in self._cancelled: 228 # We check for cancelled first, because basic_cancel removes 229 # consumers closed with nowait from self._consumers 230 LOGGER.warning('basic_cancel - consumer is already cancelling: %s', 231 consumer_tag) 232 return 233 234 if consumer_tag not in self._consumers: 235 # Could be cancelled by user or broker earlier 236 LOGGER.warning('basic_cancel - consumer not found: %s', 237 consumer_tag) 238 return 239 240 LOGGER.debug('Cancelling consumer: %s (nowait=%s)', consumer_tag, 241 nowait) 242 243 if nowait: 244 # This is our last opportunity while the channel is open to remove 245 # this consumer callback and help gc; unfortunately, this consumer's 246 # self._cancelled and self._consumers_with_noack (if any) entries 247 # will persist until the channel is closed. 248 del self._consumers[consumer_tag] 249 250 if callback is not None: 251 self.callbacks.add(self.channel_number, spec.Basic.CancelOk, 252 callback) 253 254 self._cancelled.add(consumer_tag) 255 256 self._rpc(spec.Basic.Cancel(consumer_tag=consumer_tag, nowait=nowait), 257 self._on_cancelok if not nowait else None, 258 [(spec.Basic.CancelOk, { 259 'consumer_tag': consumer_tag 260 })] if not nowait else []) 261 262 def basic_consume(self, 263 queue, 264 on_message_callback, 265 auto_ack=False, 266 exclusive=False, 267 consumer_tag=None, 268 arguments=None, 269 callback=None): 270 """Sends the AMQP 0-9-1 command Basic.Consume to the broker and binds messages 271 for the consumer_tag to the consumer callback. If you do not pass in 272 a consumer_tag, one will be automatically generated for you. Returns 273 the consumer tag. 274 275 For more information on basic_consume, see: 276 Tutorial 2 at http://www.rabbitmq.com/getstarted.html 277 http://www.rabbitmq.com/confirms.html 278 http://www.rabbitmq.com/amqp-0-9-1-reference.html#basic.consume 279 280 :param str queue: The queue to consume from. Use the empty string to 281 specify the most recent server-named queue for this channel 282 :param callable on_message_callback: The function to call when 283 consuming with the signature 284 on_message_callback(channel, method, properties, body), where 285 channel: pika.Channel 286 method: pika.spec.Basic.Deliver 287 properties: pika.spec.BasicProperties 288 body: bytes 289 :param bool auto_ack: if set to True, automatic acknowledgement mode 290 will be used (see http://www.rabbitmq.com/confirms.html). 291 This corresponds with the 'no_ack' parameter in the basic.consume 292 AMQP 0.9.1 method 293 :param bool exclusive: Don't allow other consumers on the queue 294 :param str consumer_tag: Specify your own consumer tag 295 :param dict arguments: Custom key/value pair arguments for the consumer 296 :param callable callback: callback(pika.frame.Method) for method 297 Basic.ConsumeOk. 298 :returns: Consumer tag which may be used to cancel the consumer. 299 :rtype: str 300 :raises ValueError: 301 302 """ 303 validators.require_string(queue, 'queue') 304 validators.require_callback(on_message_callback) 305 self._raise_if_not_open() 306 validators.rpc_completion_callback(callback) 307 308 # If a consumer tag was not passed, create one 309 if not consumer_tag: 310 consumer_tag = self._generate_consumer_tag() 311 312 if consumer_tag in self._consumers or consumer_tag in self._cancelled: 313 raise exceptions.DuplicateConsumerTag(consumer_tag) 314 315 if auto_ack: 316 self._consumers_with_noack.add(consumer_tag) 317 318 self._consumers[consumer_tag] = on_message_callback 319 320 rpc_callback = self._on_eventok if callback is None else callback 321 322 self._rpc( 323 spec.Basic.Consume(queue=queue, 324 consumer_tag=consumer_tag, 325 no_ack=auto_ack, 326 exclusive=exclusive, 327 arguments=arguments or dict()), rpc_callback, 328 [(spec.Basic.ConsumeOk, { 329 'consumer_tag': consumer_tag 330 })]) 331 332 return consumer_tag 333 334 def _generate_consumer_tag(self): 335 """Generate a consumer tag 336 337 NOTE: this protected method may be called by derived classes 338 339 :returns: consumer tag 340 :rtype: str 341 342 """ 343 return 'ctag%i.%s' % (self.channel_number, uuid.uuid4().hex) 344 345 def basic_get(self, queue, callback, auto_ack=False): 346 """Get a single message from the AMQP broker. If you want to 347 be notified of Basic.GetEmpty, use the Channel.add_callback method 348 adding your Basic.GetEmpty callback which should expect only one 349 parameter, frame. Due to implementation details, this cannot be called 350 a second time until the callback is executed. For more information on 351 basic_get and its parameters, see: 352 353 http://www.rabbitmq.com/amqp-0-9-1-reference.html#basic.get 354 355 :param str queue: The queue from which to get a message. Use the empty 356 string to specify the most recent server-named queue for this 357 channel 358 :param callable callback: The callback to call with a message that has 359 the signature callback(channel, method, properties, body), where: 360 channel: pika.Channel 361 method: pika.spec.Basic.GetOk 362 properties: pika.spec.BasicProperties 363 body: bytes 364 :param bool auto_ack: Tell the broker to not expect a reply 365 :raises ValueError: 366 367 """ 368 validators.require_string(queue, 'queue') 369 validators.require_callback(callback) 370 if self._on_getok_callback is not None: 371 raise exceptions.DuplicateGetOkCallback() 372 self._on_getok_callback = callback 373 374 # pylint: disable=W0511 375 # TODO Strangely, not using _rpc for the synchronous Basic.Get. Would 376 # need to extend _rpc to handle Basic.GetOk method, header, and body 377 # frames (or similar) 378 self._send_method(spec.Basic.Get(queue=queue, no_ack=auto_ack)) 379 380 def basic_nack(self, delivery_tag=None, multiple=False, requeue=True): 381 """This method allows a client to reject one or more incoming messages. 382 It can be used to interrupt and cancel large incoming messages, or 383 return untreatable messages to their original queue. 384 385 :param integer delivery-tag: int/long The server-assigned delivery tag 386 :param bool multiple: If set to True, the delivery tag is treated as 387 "up to and including", so that multiple messages 388 can be acknowledged with a single method. If set 389 to False, the delivery tag refers to a single 390 message. If the multiple field is 1, and the 391 delivery tag is zero, this indicates 392 acknowledgement of all outstanding messages. 393 :param bool requeue: If requeue is true, the server will attempt to 394 requeue the message. If requeue is false or the 395 requeue attempt fails the messages are discarded or 396 dead-lettered. 397 398 """ 399 self._raise_if_not_open() 400 return self._send_method( 401 spec.Basic.Nack(delivery_tag, multiple, requeue)) 402 403 def basic_publish(self, 404 exchange, 405 routing_key, 406 body, 407 properties=None, 408 mandatory=False): 409 """Publish to the channel with the given exchange, routing key and body. 410 For more information on basic_publish and what the parameters do, see: 411 412 http://www.rabbitmq.com/amqp-0-9-1-reference.html#basic.publish 413 414 :param str exchange: The exchange to publish to 415 :param str routing_key: The routing key to bind on 416 :param bytes body: The message body 417 :param pika.spec.BasicProperties properties: Basic.properties 418 :param bool mandatory: The mandatory flag 419 420 """ 421 self._raise_if_not_open() 422 if isinstance(body, unicode_type): 423 body = body.encode('utf-8') 424 properties = properties or spec.BasicProperties() 425 self._send_method( 426 spec.Basic.Publish(exchange=exchange, 427 routing_key=routing_key, 428 mandatory=mandatory), (properties, body)) 429 430 def basic_qos(self, 431 prefetch_size=0, 432 prefetch_count=0, 433 global_qos=False, 434 callback=None): 435 """Specify quality of service. This method requests a specific quality 436 of service. The QoS can be specified for the current channel or for all 437 channels on the connection. The client can request that messages be sent 438 in advance so that when the client finishes processing a message, the 439 following message is already held locally, rather than needing to be 440 sent down the channel. Prefetching gives a performance improvement. 441 442 :param int prefetch_size: This field specifies the prefetch window 443 size. The server will send a message in 444 advance if it is equal to or smaller in size 445 than the available prefetch size (and also 446 falls into other prefetch limits). May be set 447 to zero, meaning "no specific limit", 448 although other prefetch limits may still 449 apply. The prefetch-size is ignored by 450 consumers who have enabled the no-ack option. 451 :param int prefetch_count: Specifies a prefetch window in terms of whole 452 messages. This field may be used in 453 combination with the prefetch-size field; a 454 message will only be sent in advance if both 455 prefetch windows (and those at the channel 456 and connection level) allow it. The 457 prefetch-count is ignored by consumers who 458 have enabled the no-ack option. 459 :param bool global_qos: Should the QoS apply to all channels on the 460 connection. 461 :param callable callback: The callback to call for Basic.QosOk response 462 :raises ValueError: 463 464 """ 465 self._raise_if_not_open() 466 validators.rpc_completion_callback(callback) 467 validators.zero_or_greater('prefetch_size', prefetch_size) 468 validators.zero_or_greater('prefetch_count', prefetch_count) 469 return self._rpc( 470 spec.Basic.Qos(prefetch_size, prefetch_count, global_qos), callback, 471 [spec.Basic.QosOk]) 472 473 def basic_reject(self, delivery_tag, requeue=True): 474 """Reject an incoming message. This method allows a client to reject a 475 message. It can be used to interrupt and cancel large incoming messages, 476 or return untreatable messages to their original queue. 477 478 :param integer delivery-tag: int/long The server-assigned delivery tag 479 :param bool requeue: If requeue is true, the server will attempt to 480 requeue the message. If requeue is false or the 481 requeue attempt fails the messages are discarded or 482 dead-lettered. 483 :raises: TypeError 484 485 """ 486 self._raise_if_not_open() 487 if not is_integer(delivery_tag): 488 raise TypeError('delivery_tag must be an integer') 489 return self._send_method(spec.Basic.Reject(delivery_tag, requeue)) 490 491 def basic_recover(self, requeue=False, callback=None): 492 """This method asks the server to redeliver all unacknowledged messages 493 on a specified channel. Zero or more messages may be redelivered. This 494 method replaces the asynchronous Recover. 495 496 :param bool requeue: If False, the message will be redelivered to the 497 original recipient. If True, the server will 498 attempt to requeue the message, potentially then 499 delivering it to an alternative subscriber. 500 :param callable callback: Callback to call when receiving 501 Basic.RecoverOk 502 :param callable callback: callback(pika.frame.Method) for method 503 Basic.RecoverOk 504 :raises ValueError: 505 506 """ 507 self._raise_if_not_open() 508 validators.rpc_completion_callback(callback) 509 return self._rpc(spec.Basic.Recover(requeue), callback, 510 [spec.Basic.RecoverOk]) 511 512 def close(self, reply_code=0, reply_text="Normal shutdown"): 513 """Invoke a graceful shutdown of the channel with the AMQP Broker. 514 515 If channel is OPENING, transition to CLOSING and suppress the incoming 516 Channel.OpenOk, if any. 517 518 :param int reply_code: The reason code to send to broker 519 :param str reply_text: The reason text to send to broker 520 521 :raises ChannelWrongStateError: if channel is closed or closing 522 523 """ 524 if self.is_closed or self.is_closing: 525 # Whoever is calling `close` might expect the on-channel-close-cb 526 # to be called, which won't happen when it's already closed. 527 self._raise_if_not_open() 528 529 # If channel is OPENING, we will transition it to CLOSING state, 530 # causing the _on_openok method to suppress the OPEN state transition 531 # and the on-channel-open-callback 532 533 LOGGER.info('Closing channel (%s): %r on %s', reply_code, reply_text, 534 self) 535 536 # Save the reason info so that we may use it in the '_on_channel_close' 537 # callback processing 538 self._closing_reason = exceptions.ChannelClosedByClient( 539 reply_code, reply_text) 540 541 for consumer_tag in dictkeys(self._consumers): 542 if consumer_tag not in self._cancelled: 543 self.basic_cancel(consumer_tag=consumer_tag) 544 545 # Change state after cancelling consumers to avoid 546 # ChannelWrongStateError exception from basic_cancel 547 self._set_state(self.CLOSING) 548 549 self._rpc(spec.Channel.Close(reply_code, reply_text, 0, 0), 550 self._on_closeok, [spec.Channel.CloseOk]) 551 552 def confirm_delivery(self, ack_nack_callback, callback=None): 553 """Turn on Confirm mode in the channel. Pass in a callback to be 554 notified by the Broker when a message has been confirmed as received or 555 rejected (Basic.Ack, Basic.Nack) from the broker to the publisher. 556 557 For more information see: 558 https://www.rabbitmq.com/confirms.html 559 560 :param callable ack_nack_callback: Required callback for delivery 561 confirmations that has the following signature: 562 callback(pika.frame.Method), where method_frame contains 563 either method `spec.Basic.Ack` or `spec.Basic.Nack`. 564 :param callable callback: callback(pika.frame.Method) for method 565 Confirm.SelectOk 566 :raises ValueError: 567 568 """ 569 if not callable(ack_nack_callback): 570 # confirm_deliver requires a callback; it's meaningless 571 # without a user callback to receieve Basic.Ack/Basic.Nack notifications 572 raise ValueError('confirm_delivery requires a callback ' 573 'to receieve Basic.Ack/Basic.Nack notifications') 574 575 self._raise_if_not_open() 576 nowait = validators.rpc_completion_callback(callback) 577 578 if not (self.connection.publisher_confirms and 579 self.connection.basic_nack): 580 raise exceptions.MethodNotImplemented( 581 'Confirm.Select not Supported by Server') 582 583 # Add the ack and nack callback 584 self.callbacks.add(self.channel_number, spec.Basic.Ack, 585 ack_nack_callback, False) 586 self.callbacks.add(self.channel_number, spec.Basic.Nack, 587 ack_nack_callback, False) 588 589 self._rpc(spec.Confirm.Select(nowait), callback, 590 [spec.Confirm.SelectOk] if not nowait else []) 591 592 @property 593 def consumer_tags(self): 594 """Property method that returns a list of currently active consumers 595 596 :rtype: list 597 598 """ 599 return dictkeys(self._consumers) 600 601 def exchange_bind(self, 602 destination, 603 source, 604 routing_key='', 605 arguments=None, 606 callback=None): 607 """Bind an exchange to another exchange. 608 609 :param str destination: The destination exchange to bind 610 :param str source: The source exchange to bind to 611 :param str routing_key: The routing key to bind on 612 :param dict arguments: Custom key/value pair arguments for the binding 613 :param callable callback: callback(pika.frame.Method) for method Exchange.BindOk 614 :raises ValueError: 615 616 """ 617 self._raise_if_not_open() 618 validators.require_string(destination, 'destination') 619 validators.require_string(source, 'source') 620 nowait = validators.rpc_completion_callback(callback) 621 return self._rpc( 622 spec.Exchange.Bind(0, destination, source, routing_key, nowait, 623 arguments or dict()), callback, 624 [spec.Exchange.BindOk] if not nowait else []) 625 626 def exchange_declare(self, 627 exchange, 628 exchange_type='direct', 629 passive=False, 630 durable=False, 631 auto_delete=False, 632 internal=False, 633 arguments=None, 634 callback=None): 635 """This method creates an exchange if it does not already exist, and if 636 the exchange exists, verifies that it is of the correct and expected 637 class. 638 639 If passive set, the server will reply with Declare-Ok if the exchange 640 already exists with the same name, and raise an error if not and if the 641 exchange does not already exist, the server MUST raise a channel 642 exception with reply code 404 (not found). 643 644 :param str exchange: The exchange name consists of a non-empty sequence 645 of these characters: letters, digits, hyphen, underscore, period, 646 or colon 647 :param str exchange_type: The exchange type to use 648 :param bool passive: Perform a declare or just check to see if it exists 649 :param bool durable: Survive a reboot of RabbitMQ 650 :param bool auto_delete: Remove when no more queues are bound to it 651 :param bool internal: Can only be published to by other exchanges 652 :param dict arguments: Custom key/value pair arguments for the exchange 653 :param callable callback: callback(pika.frame.Method) for method Exchange.DeclareOk 654 :raises ValueError: 655 656 """ 657 validators.require_string(exchange, 'exchange') 658 self._raise_if_not_open() 659 nowait = validators.rpc_completion_callback(callback) 660 return self._rpc( 661 spec.Exchange.Declare(0, exchange, exchange_type, passive, durable, 662 auto_delete, internal, nowait, arguments or 663 dict()), callback, 664 [spec.Exchange.DeclareOk] if not nowait else []) 665 666 def exchange_delete(self, exchange=None, if_unused=False, callback=None): 667 """Delete the exchange. 668 669 :param str exchange: The exchange name 670 :param bool if_unused: only delete if the exchange is unused 671 :param callable callback: callback(pika.frame.Method) for method Exchange.DeleteOk 672 :raises ValueError: 673 674 """ 675 self._raise_if_not_open() 676 nowait = validators.rpc_completion_callback(callback) 677 return self._rpc(spec.Exchange.Delete(0, exchange, if_unused, 678 nowait), callback, 679 [spec.Exchange.DeleteOk] if not nowait else []) 680 681 def exchange_unbind(self, 682 destination=None, 683 source=None, 684 routing_key='', 685 arguments=None, 686 callback=None): 687 """Unbind an exchange from another exchange. 688 689 :param str destination: The destination exchange to unbind 690 :param str source: The source exchange to unbind from 691 :param str routing_key: The routing key to unbind 692 :param dict arguments: Custom key/value pair arguments for the binding 693 :param callable callback: callback(pika.frame.Method) for method Exchange.UnbindOk 694 :raises ValueError: 695 696 """ 697 self._raise_if_not_open() 698 nowait = validators.rpc_completion_callback(callback) 699 return self._rpc( 700 spec.Exchange.Unbind(0, destination, source, routing_key, nowait, 701 arguments), callback, 702 [spec.Exchange.UnbindOk] if not nowait else []) 703 704 def flow(self, active, callback=None): 705 """Turn Channel flow control off and on. Pass a callback to be notified 706 of the response from the server. active is a bool. Callback should 707 expect a bool in response indicating channel flow state. For more 708 information, please reference: 709 710 http://www.rabbitmq.com/amqp-0-9-1-reference.html#channel.flow 711 712 :param bool active: Turn flow on or off 713 :param callable callback: callback(bool) upon completion 714 :raises ValueError: 715 716 """ 717 self._raise_if_not_open() 718 validators.rpc_completion_callback(callback) 719 self._on_flowok_callback = callback 720 self._rpc(spec.Channel.Flow(active), self._on_flowok, 721 [spec.Channel.FlowOk]) 722 723 @property 724 def is_closed(self): 725 """Returns True if the channel is closed. 726 727 :rtype: bool 728 729 """ 730 return self._state == self.CLOSED 731 732 @property 733 def is_closing(self): 734 """Returns True if client-initiated closing of the channel is in 735 progress. 736 737 :rtype: bool 738 739 """ 740 return self._state == self.CLOSING 741 742 @property 743 def is_open(self): 744 """Returns True if the channel is open. 745 746 :rtype: bool 747 748 """ 749 return self._state == self.OPEN 750 751 def open(self): 752 """Open the channel""" 753 self._set_state(self.OPENING) 754 self._add_callbacks() 755 self._rpc(spec.Channel.Open(), self._on_openok, [spec.Channel.OpenOk]) 756 757 def queue_bind(self, 758 queue, 759 exchange, 760 routing_key=None, 761 arguments=None, 762 callback=None): 763 """Bind the queue to the specified exchange 764 765 :param str queue: The queue to bind to the exchange 766 :param str exchange: The source exchange to bind to 767 :param str routing_key: The routing key to bind on 768 :param dict arguments: Custom key/value pair arguments for the binding 769 :param callable callback: callback(pika.frame.Method) for method Queue.BindOk 770 :raises ValueError: 771 772 """ 773 validators.require_string(queue, 'queue') 774 validators.require_string(exchange, 'exchange') 775 self._raise_if_not_open() 776 nowait = validators.rpc_completion_callback(callback) 777 if routing_key is None: 778 routing_key = queue 779 return self._rpc( 780 spec.Queue.Bind(0, queue, exchange, routing_key, nowait, 781 arguments or dict()), callback, 782 [spec.Queue.BindOk] if not nowait else []) 783 784 def queue_declare(self, 785 queue, 786 passive=False, 787 durable=False, 788 exclusive=False, 789 auto_delete=False, 790 arguments=None, 791 callback=None): 792 """Declare queue, create if needed. This method creates or checks a 793 queue. When creating a new queue the client can specify various 794 properties that control the durability of the queue and its contents, 795 and the level of sharing for the queue. 796 797 Use an empty string as the queue name for the broker to auto-generate 798 one 799 800 :param str queue: The queue name; if empty string, the broker will 801 create a unique queue name 802 :param bool passive: Only check to see if the queue exists 803 :param bool durable: Survive reboots of the broker 804 :param bool exclusive: Only allow access by the current connection 805 :param bool auto_delete: Delete after consumer cancels or disconnects 806 :param dict arguments: Custom key/value arguments for the queue 807 :param callable callback: callback(pika.frame.Method) for method Queue.DeclareOk 808 :raises ValueError: 809 810 """ 811 validators.require_string(queue, 'queue') 812 self._raise_if_not_open() 813 nowait = validators.rpc_completion_callback(callback) 814 815 if queue: 816 condition = (spec.Queue.DeclareOk, {'queue': queue}) 817 else: 818 condition = spec.Queue.DeclareOk 819 replies = [condition] if not nowait else [] 820 821 return self._rpc( 822 spec.Queue.Declare(0, queue, passive, durable, exclusive, 823 auto_delete, nowait, arguments or dict()), 824 callback, replies) 825 826 def queue_delete(self, 827 queue, 828 if_unused=False, 829 if_empty=False, 830 callback=None): 831 """Delete a queue from the broker. 832 833 :param str queue: The queue to delete 834 :param bool if_unused: only delete if it's unused 835 :param bool if_empty: only delete if the queue is empty 836 :param callable callback: callback(pika.frame.Method) for method Queue.DeleteOk 837 :raises ValueError: 838 839 """ 840 self._raise_if_not_open() 841 validators.require_string(queue, 'queue') 842 nowait = validators.rpc_completion_callback(callback) 843 replies = [spec.Queue.DeleteOk] if not nowait else [] 844 return self._rpc( 845 spec.Queue.Delete(0, queue, if_unused, if_empty, nowait), callback, 846 replies) 847 848 def queue_purge(self, queue, callback=None): 849 """Purge all of the messages from the specified queue 850 851 :param str queue: The queue to purge 852 :param callable callback: callback(pika.frame.Method) for method Queue.PurgeOk 853 :raises ValueError: 854 855 """ 856 self._raise_if_not_open() 857 validators.require_string(queue, 'queue') 858 nowait = validators.rpc_completion_callback(callback) 859 replies = [spec.Queue.PurgeOk] if not nowait else [] 860 return self._rpc(spec.Queue.Purge(0, queue, nowait), callback, replies) 861 862 def queue_unbind(self, 863 queue, 864 exchange=None, 865 routing_key=None, 866 arguments=None, 867 callback=None): 868 """Unbind a queue from an exchange. 869 870 :param str queue: The queue to unbind from the exchange 871 :param str exchange: The source exchange to bind from 872 :param str routing_key: The routing key to unbind 873 :param dict arguments: Custom key/value pair arguments for the binding 874 :param callable callback: callback(pika.frame.Method) for method Queue.UnbindOk 875 :raises ValueError: 876 877 """ 878 self._raise_if_not_open() 879 validators.require_string(queue, 'queue') 880 validators.rpc_completion_callback(callback) 881 if routing_key is None: 882 routing_key = queue 883 return self._rpc( 884 spec.Queue.Unbind(0, queue, exchange, routing_key, arguments or 885 dict()), callback, [spec.Queue.UnbindOk]) 886 887 def tx_commit(self, callback=None): 888 """Commit a transaction 889 890 :param callable callback: The callback for delivery confirmations 891 :raises ValueError: 892 893 """ 894 self._raise_if_not_open() 895 validators.rpc_completion_callback(callback) 896 return self._rpc(spec.Tx.Commit(), callback, [spec.Tx.CommitOk]) 897 898 def tx_rollback(self, callback=None): 899 """Rollback a transaction. 900 901 :param callable callback: The callback for delivery confirmations 902 :raises ValueError: 903 904 """ 905 self._raise_if_not_open() 906 validators.rpc_completion_callback(callback) 907 return self._rpc(spec.Tx.Rollback(), callback, [spec.Tx.RollbackOk]) 908 909 def tx_select(self, callback=None): 910 """Select standard transaction mode. This method sets the channel to use 911 standard transactions. The client must use this method at least once on 912 a channel before using the Commit or Rollback methods. 913 914 :param callable callback: The callback for delivery confirmations 915 :raises ValueError: 916 917 """ 918 self._raise_if_not_open() 919 validators.rpc_completion_callback(callback) 920 return self._rpc(spec.Tx.Select(), callback, [spec.Tx.SelectOk]) 921 922 # Internal methods 923 924 def _add_callbacks(self): 925 """Callbacks that add the required behavior for a channel when 926 connecting and connected to a server. 927 928 """ 929 # Add a callback for Basic.GetEmpty 930 self.callbacks.add(self.channel_number, spec.Basic.GetEmpty, 931 self._on_getempty, False) 932 933 # Add a callback for Basic.Cancel 934 self.callbacks.add(self.channel_number, spec.Basic.Cancel, 935 self._on_cancel, False) 936 937 # Deprecated in newer versions of RabbitMQ but still register for it 938 self.callbacks.add(self.channel_number, spec.Channel.Flow, 939 self._on_flow, False) 940 941 # Add a callback for when the server closes our channel 942 self.callbacks.add(self.channel_number, spec.Channel.Close, 943 self._on_close_from_broker, True) 944 945 def _add_on_cleanup_callback(self, callback): 946 """For internal use only (e.g., Connection needs to remove closed 947 channels from its channel container). Pass a callback function that will 948 be called when the channel is being cleaned up after all channel-close 949 callbacks callbacks. 950 951 :param callable callback: The callback to call, having the 952 signature: callback(channel) 953 954 """ 955 self.callbacks.add(self.channel_number, 956 self._ON_CHANNEL_CLEANUP_CB_KEY, 957 callback, 958 one_shot=True, 959 only_caller=self) 960 961 def _cleanup(self): 962 """Remove all consumers and any callbacks for the channel.""" 963 self.callbacks.process(self.channel_number, 964 self._ON_CHANNEL_CLEANUP_CB_KEY, self, self) 965 self._consumers = dict() 966 self.callbacks.cleanup(str(self.channel_number)) 967 self._cookie = None 968 969 def _cleanup_consumer_ref(self, consumer_tag): 970 """Remove any references to the consumer tag in internal structures 971 for consumer state. 972 973 :param str consumer_tag: The consumer tag to cleanup 974 975 """ 976 self._consumers_with_noack.discard(consumer_tag) 977 self._consumers.pop(consumer_tag, None) 978 self._cancelled.discard(consumer_tag) 979 980 def _get_cookie(self): 981 """Used by the wrapper implementation (e.g., `BlockingChannel`) to 982 retrieve the cookie that it set via `_set_cookie` 983 984 :returns: opaque cookie value that was set via `_set_cookie` 985 :rtype: object 986 987 """ 988 return self._cookie 989 990 def _handle_content_frame(self, frame_value): 991 """This is invoked by the connection when frames that are not registered 992 with the CallbackManager have been found. This should only be the case 993 when the frames are related to content delivery. 994 995 The _content_assembler will be invoked which will return the fully 996 formed message in three parts when all of the body frames have been 997 received. 998 999 :param pika.amqp_object.Frame frame_value: The frame to deliver 1000 1001 """ 1002 try: 1003 response = self._content_assembler.process(frame_value) 1004 except exceptions.UnexpectedFrameError: 1005 self._on_unexpected_frame(frame_value) 1006 return 1007 1008 if response: 1009 if isinstance(response[0].method, spec.Basic.Deliver): 1010 self._on_deliver(*response) 1011 elif isinstance(response[0].method, spec.Basic.GetOk): 1012 self._on_getok(*response) 1013 elif isinstance(response[0].method, spec.Basic.Return): 1014 self._on_return(*response) 1015 1016 def _on_cancel(self, method_frame): 1017 """When the broker cancels a consumer, delete it from our internal 1018 dictionary. 1019 1020 :param pika.frame.Method method_frame: The method frame received 1021 1022 """ 1023 if method_frame.method.consumer_tag in self._cancelled: 1024 # User-initiated cancel is waiting for Cancel-ok 1025 return 1026 1027 self._cleanup_consumer_ref(method_frame.method.consumer_tag) 1028 1029 def _on_cancelok(self, method_frame): 1030 """Called in response to a frame from the Broker when the 1031 client sends Basic.Cancel 1032 1033 :param pika.frame.Method method_frame: The method frame received 1034 1035 """ 1036 self._cleanup_consumer_ref(method_frame.method.consumer_tag) 1037 1038 def _transition_to_closed(self): 1039 """Common logic for transitioning the channel to the CLOSED state: 1040 1041 Set state to CLOSED, dispatch callbacks registered via 1042 `Channel.add_on_close_callback()`, and mop up. 1043 1044 Assumes that the channel is not in CLOSED state and that 1045 `self._closing_reason` has been set up 1046 1047 """ 1048 assert not self.is_closed 1049 assert self._closing_reason is not None 1050 1051 self._set_state(self.CLOSED) 1052 1053 try: 1054 self.callbacks.process(self.channel_number, '_on_channel_close', 1055 self, self, self._closing_reason) 1056 finally: 1057 self._cleanup() 1058 1059 def _on_close_from_broker(self, method_frame): 1060 """Handle `Channel.Close` from broker. 1061 1062 :param pika.frame.Method method_frame: Method frame with Channel.Close 1063 method 1064 1065 """ 1066 LOGGER.warning('Received remote Channel.Close (%s): %r on %s', 1067 method_frame.method.reply_code, 1068 method_frame.method.reply_text, self) 1069 # Note, we should not be called when channel is already closed 1070 assert not self.is_closed 1071 1072 # AMQP 0.9.1 requires CloseOk response to Channel.Close; 1073 self._send_method(spec.Channel.CloseOk()) 1074 1075 # Save the details, possibly overriding user-provided values if 1076 # user-initiated close is pending (in which case they will be provided 1077 # to user callback when CloseOk arrives). 1078 self._closing_reason = exceptions.ChannelClosedByBroker( 1079 method_frame.method.reply_code, method_frame.method.reply_text) 1080 1081 if self.is_closing: 1082 # Since we may have already put Channel.Close on the wire, we need 1083 # to wait for CloseOk before cleaning up to avoid a race condition 1084 # whereby our channel number might get reused before our CloseOk 1085 # arrives 1086 # 1087 # NOTE: if our Channel.Close destined for the broker was blocked by 1088 # an earlier synchronous method, this call will drop it and perform 1089 # a meta-close (see `_on_close_meta()` which fakes receipt of 1090 # `Channel.CloseOk` and dispatches the `'_on_channel_close'` 1091 # callbacks. 1092 self._drain_blocked_methods_on_remote_close() 1093 else: 1094 self._transition_to_closed() 1095 1096 def _on_close_meta(self, reason): 1097 """Handle meta-close request from either a remote Channel.Close from 1098 the broker (when a pending Channel.Close method is queued for 1099 execution) or a Connection's cleanup logic after sudden connection 1100 loss. We use this opportunity to transition to CLOSED state, clean up 1101 the channel, and dispatch the on-channel-closed callbacks. 1102 1103 :param Exception reason: Exception describing the reason for closing. 1104 1105 """ 1106 LOGGER.debug('Handling meta-close on %s: %r', self, reason) 1107 1108 if not self.is_closed: 1109 self._closing_reason = reason 1110 self._transition_to_closed() 1111 1112 def _on_closeok(self, method_frame): 1113 """Invoked when RabbitMQ replies to a Channel.Close method 1114 1115 :param pika.frame.Method method_frame: Method frame with Channel.CloseOk 1116 method 1117 1118 """ 1119 LOGGER.info('Received %s on %s', method_frame.method, self) 1120 1121 self._transition_to_closed() 1122 1123 def _on_deliver(self, method_frame, header_frame, body): 1124 """Cope with reentrancy. If a particular consumer is still active when 1125 another delivery appears for it, queue the deliveries up until it 1126 finally exits. 1127 1128 :param pika.frame.Method method_frame: The method frame received 1129 :param pika.frame.Header header_frame: The header frame received 1130 :param bytes body: The body received 1131 1132 """ 1133 consumer_tag = method_frame.method.consumer_tag 1134 1135 if consumer_tag in self._cancelled: 1136 if self.is_open and consumer_tag not in self._consumers_with_noack: 1137 self.basic_reject(method_frame.method.delivery_tag) 1138 return 1139 1140 if consumer_tag not in self._consumers: 1141 LOGGER.error('Unexpected delivery: %r', method_frame) 1142 return 1143 1144 self._consumers[consumer_tag](self, method_frame.method, 1145 header_frame.properties, body) 1146 1147 def _on_eventok(self, method_frame): 1148 """Generic events that returned ok that may have internal callbacks. 1149 We keep a list of what we've yet to implement so that we don't silently 1150 drain events that we don't support. 1151 1152 :param pika.frame.Method method_frame: The method frame received 1153 1154 """ 1155 LOGGER.debug('Discarding frame %r', method_frame) 1156 1157 def _on_flow(self, _method_frame_unused): 1158 """Called if the server sends a Channel.Flow frame. 1159 1160 :param pika.frame.Method method_frame_unused: The Channel.Flow frame 1161 1162 """ 1163 if self._has_on_flow_callback is False: 1164 LOGGER.warning('Channel.Flow received from server') 1165 1166 def _on_flowok(self, method_frame): 1167 """Called in response to us asking the server to toggle on Channel.Flow 1168 1169 :param pika.frame.Method method_frame: The method frame received 1170 1171 """ 1172 self.flow_active = method_frame.method.active 1173 if self._on_flowok_callback: 1174 self._on_flowok_callback(method_frame.method.active) 1175 self._on_flowok_callback = None 1176 else: 1177 LOGGER.warning('Channel.FlowOk received with no active callbacks') 1178 1179 def _on_getempty(self, method_frame): 1180 """When we receive an empty reply do nothing but log it 1181 1182 :param pika.frame.Method method_frame: The method frame received 1183 1184 """ 1185 LOGGER.debug('Received Basic.GetEmpty: %r', method_frame) 1186 if self._on_getok_callback is not None: 1187 self._on_getok_callback = None 1188 1189 def _on_getok(self, method_frame, header_frame, body): 1190 """Called in reply to a Basic.Get when there is a message. 1191 1192 :param pika.frame.Method method_frame: The method frame received 1193 :param pika.frame.Header header_frame: The header frame received 1194 :param bytes body: The body received 1195 1196 """ 1197 if self._on_getok_callback is not None: 1198 callback = self._on_getok_callback 1199 self._on_getok_callback = None 1200 callback(self, method_frame.method, header_frame.properties, body) 1201 else: 1202 LOGGER.error('Basic.GetOk received with no active callback') 1203 1204 def _on_openok(self, method_frame): 1205 """Called by our callback handler when we receive a Channel.OpenOk and 1206 subsequently calls our _on_openok_callback which was passed into the 1207 Channel constructor. The reason we do this is because we want to make 1208 sure that the on_open_callback parameter passed into the Channel 1209 constructor is not the first callback we make. 1210 1211 Suppress the state transition and callback if channel is already in 1212 CLOSING state. 1213 1214 :param pika.frame.Method method_frame: Channel.OpenOk frame 1215 1216 """ 1217 # Suppress OpenOk if the user or Connection.Close started closing it 1218 # before open completed. 1219 if self.is_closing: 1220 LOGGER.debug('Suppressing while in closing state: %s', method_frame) 1221 else: 1222 self._set_state(self.OPEN) 1223 if self._on_openok_callback is not None: 1224 self._on_openok_callback(self) 1225 1226 def _on_return(self, method_frame, header_frame, body): 1227 """Called if the server sends a Basic.Return frame. 1228 1229 :param pika.frame.Method method_frame: The Basic.Return frame 1230 :param pika.frame.Header header_frame: The content header frame 1231 :param bytes body: The message body 1232 1233 """ 1234 if not self.callbacks.process(self.channel_number, '_on_return', self, 1235 self, method_frame.method, 1236 header_frame.properties, body): 1237 LOGGER.warning('Basic.Return received from server (%r, %r)', 1238 method_frame.method, header_frame.properties) 1239 1240 def _on_selectok(self, method_frame): 1241 """Called when the broker sends a Confirm.SelectOk frame 1242 1243 :param pika.frame.Method method_frame: The method frame received 1244 1245 """ 1246 LOGGER.debug("Confirm.SelectOk Received: %r", method_frame) 1247 1248 def _on_synchronous_complete(self, _method_frame_unused): 1249 """This is called when a synchronous command is completed. It will undo 1250 the blocking state and send all the frames that stacked up while we 1251 were in the blocking state. 1252 1253 :param pika.frame.Method method_frame_unused: The method frame received 1254 1255 """ 1256 LOGGER.debug('%i blocked frames', len(self._blocked)) 1257 self._blocking = None 1258 # self._blocking must be checked here as a callback could 1259 # potentially change the state of that variable during an 1260 # iteration of the while loop 1261 while self._blocked and self._blocking is None: 1262 self._rpc(*self._blocked.popleft()) 1263 1264 def _drain_blocked_methods_on_remote_close(self): 1265 """This is called when the broker sends a Channel.Close while the 1266 client is in CLOSING state. This method checks the blocked method 1267 queue for a pending client-initiated Channel.Close method and 1268 ensures its callbacks are processed, but does not send the method 1269 to the broker. 1270 1271 The broker may close the channel before responding to outstanding 1272 in-transit synchronous methods, or even before these methods have been 1273 sent to the broker. AMQP 0.9.1 obliges the server to drop all methods 1274 arriving on a closed channel other than Channel.CloseOk and 1275 Channel.Close. Since the response to a synchronous method that blocked 1276 the channel never arrives, the channel never becomes unblocked, and the 1277 Channel.Close, if any, in the blocked queue has no opportunity to be 1278 sent, and thus its completion callback would never be called. 1279 1280 """ 1281 LOGGER.debug( 1282 'Draining %i blocked frames due to broker-requested Channel.Close', 1283 len(self._blocked)) 1284 while self._blocked: 1285 method = self._blocked.popleft()[0] 1286 if isinstance(method, spec.Channel.Close): 1287 # The desired reason is already in self._closing_reason 1288 self._on_close_meta(self._closing_reason) 1289 else: 1290 LOGGER.debug('Ignoring drained blocked method: %s', method) 1291 1292 def _rpc(self, method, callback=None, acceptable_replies=None): 1293 """Make a syncronous channel RPC call for a synchronous method frame. If 1294 the channel is already in the blocking state, then enqueue the request, 1295 but don't send it at this time; it will be eventually sent by 1296 `_on_synchronous_complete` after the prior blocking request receives a 1297 response. If the channel is not in the blocking state and 1298 `acceptable_replies` is not empty, transition the channel to the 1299 blocking state and register for `_on_synchronous_complete` before 1300 sending the request. 1301 1302 NOTE: A callback must be accompanied by non-empty acceptable_replies. 1303 1304 :param pika.amqp_object.Method method: The AMQP method to invoke 1305 :param callable callback: The callback for the RPC response 1306 :param list|None acceptable_replies: A (possibly empty) sequence of 1307 replies this RPC call expects or None 1308 1309 """ 1310 assert method.synchronous, ( 1311 'Only synchronous-capable methods may be used with _rpc: %r' % 1312 (method,)) 1313 1314 # Validate we got None or a list of acceptable_replies 1315 if not isinstance(acceptable_replies, (type(None), list)): 1316 raise TypeError('acceptable_replies should be list or None') 1317 1318 if callback is not None: 1319 # Validate the callback is callable 1320 if not callable(callback): 1321 raise TypeError('callback should be None or a callable') 1322 1323 # Make sure that callback is accompanied by acceptable replies 1324 if not acceptable_replies: 1325 raise ValueError( 1326 'Unexpected callback for asynchronous (nowait) operation.') 1327 1328 # Make sure the channel is not closed yet 1329 if self.is_closed: 1330 self._raise_if_not_open() 1331 1332 # If the channel is blocking, add subsequent commands to our stack 1333 if self._blocking: 1334 LOGGER.debug( 1335 'Already in blocking state, so enqueueing method %s; ' 1336 'acceptable_replies=%r', method, acceptable_replies) 1337 self._blocked.append([method, callback, acceptable_replies]) 1338 return 1339 1340 # Note: _send_method can throw exceptions if there are framing errors 1341 # or invalid data passed in. Call it here to prevent self._blocking 1342 # from being set if an exception is thrown. This also prevents 1343 # acceptable_replies registering callbacks when exceptions are thrown 1344 self._send_method(method) 1345 1346 # If acceptable replies are set, add callbacks 1347 if acceptable_replies: 1348 # Block until a response frame is received for synchronous frames 1349 self._blocking = method.NAME 1350 LOGGER.debug( 1351 'Entering blocking state on frame %s; acceptable_replies=%r', 1352 method, acceptable_replies) 1353 1354 for reply in acceptable_replies: 1355 if isinstance(reply, tuple): 1356 reply, arguments = reply 1357 else: 1358 arguments = None 1359 LOGGER.debug('Adding on_synchronous_complete callback') 1360 self.callbacks.add(self.channel_number, 1361 reply, 1362 self._on_synchronous_complete, 1363 arguments=arguments) 1364 if callback is not None: 1365 LOGGER.debug('Adding passed-in RPC response callback') 1366 self.callbacks.add(self.channel_number, 1367 reply, 1368 callback, 1369 arguments=arguments) 1370 1371 def _raise_if_not_open(self): 1372 """If channel is not in the OPEN state, raises ChannelWrongStateError 1373 with `reply_code` and `reply_text` corresponding to current state. 1374 1375 :raises exceptions.ChannelWrongStateError: if channel is not in OPEN 1376 state. 1377 """ 1378 if self._state == self.OPEN: 1379 return 1380 1381 if self._state == self.OPENING: 1382 raise exceptions.ChannelWrongStateError('Channel is opening, but is not usable yet.') 1383 1384 if self._state == self.CLOSING: 1385 raise exceptions.ChannelWrongStateError('Channel is closing.') 1386 1387 # Assumed self.CLOSED 1388 assert self._state == self.CLOSED 1389 raise exceptions.ChannelWrongStateError('Channel is closed.') 1390 1391 def _send_method(self, method, content=None): 1392 """Shortcut wrapper to send a method through our connection, passing in 1393 the channel number 1394 1395 :param pika.amqp_object.Method method: The method to send 1396 :param tuple content: If set, is a content frame, is tuple of 1397 properties and body. 1398 1399 """ 1400 # pylint: disable=W0212 1401 self.connection._send_method(self.channel_number, method, content) 1402 1403 def _set_cookie(self, cookie): 1404 """Used by wrapper layer (e.g., `BlockingConnection`) to link the 1405 channel implementation back to the proxy. See `_get_cookie`. 1406 1407 :param cookie: an opaque value; typically a proxy channel implementation 1408 instance (e.g., `BlockingChannel` instance) 1409 """ 1410 self._cookie = cookie 1411 1412 def _set_state(self, connection_state): 1413 """Set the channel connection state to the specified state value. 1414 1415 :param int connection_state: The connection_state value 1416 1417 """ 1418 self._state = connection_state 1419 1420 def _on_unexpected_frame(self, frame_value): 1421 """Invoked when a frame is received that is not setup to be processed. 1422 1423 :param pika.frame.Frame frame_value: The frame received 1424 1425 """ 1426 LOGGER.error('Unexpected frame: %r', frame_value) 1427 1428 1429class ContentFrameAssembler(object): 1430 """Handle content related frames, building a message and return the message 1431 back in three parts upon receipt. 1432 1433 """ 1434 1435 def __init__(self): 1436 """Create a new instance of the conent frame assembler. 1437 1438 """ 1439 self._method_frame = None 1440 self._header_frame = None 1441 self._seen_so_far = 0 1442 self._body_fragments = list() 1443 1444 def process(self, frame_value): 1445 """Invoked by the Channel object when passed frames that are not 1446 setup in the rpc process and that don't have explicit reply types 1447 defined. This includes Basic.Publish, Basic.GetOk and Basic.Return 1448 1449 :param Method|Header|Body frame_value: The frame to process 1450 1451 """ 1452 if (isinstance(frame_value, frame.Method) and 1453 spec.has_content(frame_value.method.INDEX)): 1454 self._method_frame = frame_value 1455 return None 1456 elif isinstance(frame_value, frame.Header): 1457 self._header_frame = frame_value 1458 if frame_value.body_size == 0: 1459 return self._finish() 1460 else: 1461 return None 1462 elif isinstance(frame_value, frame.Body): 1463 return self._handle_body_frame(frame_value) 1464 else: 1465 raise exceptions.UnexpectedFrameError(frame_value) 1466 1467 def _finish(self): 1468 """Invoked when all of the message has been received 1469 1470 :rtype: tuple(pika.frame.Method, pika.frame.Header, str) 1471 1472 """ 1473 content = (self._method_frame, self._header_frame, 1474 b''.join(self._body_fragments)) 1475 self._reset() 1476 return content 1477 1478 def _handle_body_frame(self, body_frame): 1479 """Receive body frames and append them to the stack. When the body size 1480 matches, call the finish method. 1481 1482 :param Body body_frame: The body frame 1483 :raises: pika.exceptions.BodyTooLongError 1484 :rtype: tuple(pika.frame.Method, pika.frame.Header, str)|None 1485 1486 """ 1487 self._seen_so_far += len(body_frame.fragment) 1488 self._body_fragments.append(body_frame.fragment) 1489 if self._seen_so_far == self._header_frame.body_size: 1490 return self._finish() 1491 elif self._seen_so_far > self._header_frame.body_size: 1492 raise exceptions.BodyTooLongError(self._seen_so_far, 1493 self._header_frame.body_size) 1494 return None 1495 1496 def _reset(self): 1497 """Reset the values for processing frames""" 1498 self._method_frame = None 1499 self._header_frame = None 1500 self._seen_so_far = 0 1501 self._body_fragments = list() 1502