1"""The blocking connection adapter module implements blocking semantics on top 2of Pika's core AMQP driver. While most of the asynchronous expectations are 3removed when using the blocking connection adapter, it attempts to remain true 4to the asynchronous RPC nature of the AMQP protocol, supporting server sent 5RPC commands. 6 7The user facing classes in the module consist of the 8:py:class:`~pika.adapters.blocking_connection.BlockingConnection` 9and the :class:`~pika.adapters.blocking_connection.BlockingChannel` 10classes. 11 12""" 13# Suppress too-many-lines 14# pylint: disable=C0302 15 16# Disable "access to protected member warnings: this wrapper implementation is 17# a friend of those instances 18# pylint: disable=W0212 19 20from collections import namedtuple, deque 21import contextlib 22import functools 23import logging 24import threading 25 26import pika.compat as compat 27import pika.exceptions as exceptions 28import pika.spec 29import pika.validators as validators 30from pika.adapters.utils import connection_workflow 31 32# NOTE: import SelectConnection after others to avoid circular depenency 33from pika.adapters import select_connection 34 35LOGGER = logging.getLogger(__name__) 36 37 38class _CallbackResult(object): 39 """ CallbackResult is a non-thread-safe implementation for receiving 40 callback results; INTERNAL USE ONLY! 41 """ 42 __slots__ = ('_value_class', '_ready', '_values') 43 44 def __init__(self, value_class=None): 45 """ 46 :param callable value_class: only needed if the CallbackResult 47 instance will be used with 48 `set_value_once` and `append_element`. 49 *args and **kwargs of the value setter 50 methods will be passed to this class. 51 52 """ 53 self._value_class = value_class 54 self._ready = None 55 self._values = None 56 self.reset() 57 58 def reset(self): 59 """Reset value, but not _value_class""" 60 self._ready = False 61 self._values = None 62 63 def __bool__(self): 64 """ Called by python runtime to implement truth value testing and the 65 built-in operation bool(); NOTE: python 3.x 66 """ 67 return self.is_ready() 68 69 # python 2.x version of __bool__ 70 __nonzero__ = __bool__ 71 72 def __enter__(self): 73 """ Entry into context manager that automatically resets the object 74 on exit; this usage pattern helps garbage-collection by eliminating 75 potential circular references. 76 """ 77 return self 78 79 def __exit__(self, *args, **kwargs): 80 """Reset value""" 81 self.reset() 82 83 def is_ready(self): 84 """ 85 :returns: True if the object is in a signaled state 86 :rtype: bool 87 """ 88 return self._ready 89 90 @property 91 def ready(self): 92 """True if the object is in a signaled state""" 93 return self._ready 94 95 def signal_once(self, *_args, **_kwargs): 96 """ Set as ready 97 98 :raises AssertionError: if result was already signalled 99 """ 100 assert not self._ready, '_CallbackResult was already set' 101 self._ready = True 102 103 def set_value_once(self, *args, **kwargs): 104 """ Set as ready with value; the value may be retrieved via the `value` 105 property getter 106 107 :raises AssertionError: if result was already set 108 """ 109 self.signal_once() 110 try: 111 self._values = (self._value_class(*args, **kwargs),) 112 except Exception: 113 LOGGER.error( 114 "set_value_once failed: value_class=%r; args=%r; kwargs=%r", 115 self._value_class, args, kwargs) 116 raise 117 118 def append_element(self, *args, **kwargs): 119 """Append an element to values""" 120 assert not self._ready or isinstance(self._values, list), ( 121 '_CallbackResult state is incompatible with append_element: ' 122 'ready=%r; values=%r' % (self._ready, self._values)) 123 124 try: 125 value = self._value_class(*args, **kwargs) 126 except Exception: 127 LOGGER.error( 128 "append_element failed: value_class=%r; args=%r; kwargs=%r", 129 self._value_class, args, kwargs) 130 raise 131 132 if self._values is None: 133 self._values = [value] 134 else: 135 self._values.append(value) 136 137 self._ready = True 138 139 @property 140 def value(self): 141 """ 142 :returns: a reference to the value that was set via `set_value_once` 143 :rtype: object 144 :raises AssertionError: if result was not set or value is incompatible 145 with `set_value_once` 146 """ 147 assert self._ready, '_CallbackResult was not set' 148 assert isinstance(self._values, tuple) and len(self._values) == 1, ( 149 '_CallbackResult value is incompatible with set_value_once: %r' % 150 (self._values,)) 151 152 return self._values[0] 153 154 @property 155 def elements(self): 156 """ 157 :returns: a reference to the list containing one or more elements that 158 were added via `append_element` 159 :rtype: list 160 :raises AssertionError: if result was not set or value is incompatible 161 with `append_element` 162 """ 163 assert self._ready, '_CallbackResult was not set' 164 assert isinstance(self._values, list) and self._values, ( 165 '_CallbackResult value is incompatible with append_element: %r' % 166 (self._values,)) 167 168 return self._values 169 170 171class _IoloopTimerContext(object): 172 """Context manager for registering and safely unregistering a 173 SelectConnection ioloop-based timer 174 """ 175 176 def __init__(self, duration, connection): 177 """ 178 :param float duration: non-negative timer duration in seconds 179 :param select_connection.SelectConnection connection: 180 """ 181 assert hasattr(connection, '_adapter_call_later'), connection 182 self._duration = duration 183 self._connection = connection 184 self._callback_result = _CallbackResult() 185 self._timer_handle = None 186 187 def __enter__(self): 188 """Register a timer""" 189 self._timer_handle = self._connection._adapter_call_later( 190 self._duration, self._callback_result.signal_once) 191 return self 192 193 def __exit__(self, *_args, **_kwargs): 194 """Unregister timer if it hasn't fired yet""" 195 if not self._callback_result: 196 self._connection._adapter_remove_timeout(self._timer_handle) 197 self._timer_handle = None 198 199 def is_ready(self): 200 """ 201 :returns: True if timer has fired, False otherwise 202 :rtype: bool 203 """ 204 return self._callback_result.is_ready() 205 206 207class _TimerEvt(object): 208 """Represents a timer created via `BlockingConnection.call_later`""" 209 __slots__ = ('timer_id', '_callback') 210 211 def __init__(self, callback): 212 """ 213 :param callback: see callback in `BlockingConnection.call_later` 214 """ 215 self._callback = callback 216 217 # Will be set to timer id returned from the underlying implementation's 218 # `_adapter_call_later` method 219 self.timer_id = None 220 221 def __repr__(self): 222 return '<%s timer_id=%s callback=%s>' % (self.__class__.__name__, 223 self.timer_id, self._callback) 224 225 def dispatch(self): 226 """Dispatch the user's callback method""" 227 LOGGER.debug('_TimerEvt.dispatch: invoking callback=%r', self._callback) 228 self._callback() 229 230 231class _ConnectionBlockedUnblockedEvtBase(object): 232 """Base class for `_ConnectionBlockedEvt` and `_ConnectionUnblockedEvt`""" 233 __slots__ = ('_callback', '_method_frame') 234 235 def __init__(self, callback, method_frame): 236 """ 237 :param callback: see callback parameter in 238 `BlockingConnection.add_on_connection_blocked_callback` and 239 `BlockingConnection.add_on_connection_unblocked_callback` 240 :param pika.frame.Method method_frame: with method_frame.method of type 241 `pika.spec.Connection.Blocked` or `pika.spec.Connection.Unblocked` 242 """ 243 self._callback = callback 244 self._method_frame = method_frame 245 246 def __repr__(self): 247 return '<%s callback=%s, frame=%s>' % ( 248 self.__class__.__name__, self._callback, self._method_frame) 249 250 def dispatch(self): 251 """Dispatch the user's callback method""" 252 self._callback(self._method_frame) 253 254 255class _ConnectionBlockedEvt(_ConnectionBlockedUnblockedEvtBase): 256 """Represents a Connection.Blocked notification from RabbitMQ broker`""" 257 258 259class _ConnectionUnblockedEvt(_ConnectionBlockedUnblockedEvtBase): 260 """Represents a Connection.Unblocked notification from RabbitMQ broker`""" 261 262 263class BlockingConnection(object): 264 """The BlockingConnection creates a layer on top of Pika's asynchronous core 265 providing methods that will block until their expected response has 266 returned. Due to the asynchronous nature of the `Basic.Deliver` and 267 `Basic.Return` calls from RabbitMQ to your application, you can still 268 implement continuation-passing style asynchronous methods if you'd like to 269 receive messages from RabbitMQ using 270 :meth:`basic_consume <BlockingChannel.basic_consume>` or if you want to be 271 notified of a delivery failure when using 272 :meth:`basic_publish <BlockingChannel.basic_publish>`. 273 274 For more information about communicating with the blocking_connection 275 adapter, be sure to check out the 276 :class:`BlockingChannel <BlockingChannel>` class which implements the 277 :class:`Channel <pika.channel.Channel>` based communication for the 278 blocking_connection adapter. 279 280 To prevent recursion/reentrancy, the blocking connection and channel 281 implementations queue asynchronously-delivered events received 282 in nested context (e.g., while waiting for `BlockingConnection.channel` or 283 `BlockingChannel.queue_declare` to complete), dispatching them synchronously 284 once nesting returns to the desired context. This concerns all callbacks, 285 such as those registered via `BlockingConnection.call_later`, 286 `BlockingConnection.add_on_connection_blocked_callback`, 287 `BlockingConnection.add_on_connection_unblocked_callback`, 288 `BlockingChannel.basic_consume`, etc. 289 290 Blocked Connection deadlock avoidance: when RabbitMQ becomes low on 291 resources, it emits Connection.Blocked (AMQP extension) to the client 292 connection when client makes a resource-consuming request on that connection 293 or its channel (e.g., `Basic.Publish`); subsequently, RabbitMQ suspsends 294 processing requests from that connection until the affected resources are 295 restored. See http://www.rabbitmq.com/connection-blocked.html. This 296 may impact `BlockingConnection` and `BlockingChannel` operations in a 297 way that users might not be expecting. For example, if the user dispatches 298 `BlockingChannel.basic_publish` in non-publisher-confirmation mode while 299 RabbitMQ is in this low-resource state followed by a synchronous request 300 (e.g., `BlockingConnection.channel`, `BlockingChannel.consume`, 301 `BlockingChannel.basic_consume`, etc.), the synchronous request will block 302 indefinitely (until Connection.Unblocked) waiting for RabbitMQ to reply. If 303 the blocked state persists for a long time, the blocking operation will 304 appear to hang. In this state, `BlockingConnection` instance and its 305 channels will not dispatch user callbacks. SOLUTION: To break this potential 306 deadlock, applications may configure the `blocked_connection_timeout` 307 connection parameter when instantiating `BlockingConnection`. Upon blocked 308 connection timeout, this adapter will raise ConnectionBlockedTimeout 309 exception`. See `pika.connection.ConnectionParameters` documentation to 310 learn more about the `blocked_connection_timeout` configuration. 311 312 """ 313 # Connection-closing callback args 314 _OnClosedArgs = namedtuple('BlockingConnection__OnClosedArgs', 315 'connection error') 316 317 # Channel-opened callback args 318 _OnChannelOpenedArgs = namedtuple('BlockingConnection__OnChannelOpenedArgs', 319 'channel') 320 321 def __init__(self, parameters=None, _impl_class=None): 322 """Create a new instance of the Connection object. 323 324 :param None | pika.connection.Parameters | sequence parameters: 325 Connection parameters instance or non-empty sequence of them. If 326 None, a `pika.connection.Parameters` instance will be created with 327 default settings. See `pika.AMQPConnectionWorkflow` for more 328 details about multiple parameter configurations and retries. 329 :param _impl_class: for tests/debugging only; implementation class; 330 None=default 331 332 :raises RuntimeError: 333 334 """ 335 # Used for mutual exclusion to avoid race condition between 336 # BlockingConnection._cleanup() and another thread calling 337 # BlockingConnection.add_callback_threadsafe() against a closed 338 # ioloop. 339 self._cleanup_mutex = threading.Lock() 340 341 # Used by the _acquire_event_dispatch decorator; when already greater 342 # than 0, event dispatch is already acquired higher up the call stack 343 self._event_dispatch_suspend_depth = 0 344 345 # Connection-specific events that are ready for dispatch: _TimerEvt, 346 # _ConnectionBlockedEvt, _ConnectionUnblockedEvt 347 self._ready_events = deque() 348 349 # Channel numbers of channels that are requesting a call to their 350 # BlockingChannel._dispatch_events method; See 351 # `_request_channel_dispatch` 352 self._channels_pending_dispatch = set() 353 354 # Receives on_close_callback args from Connection 355 self._closed_result = _CallbackResult(self._OnClosedArgs) 356 357 # Perform connection workflow 358 self._impl = None # so that attribute is created in case below raises 359 self._impl = self._create_connection(parameters, _impl_class) 360 self._impl.add_on_close_callback(self._closed_result.set_value_once) 361 362 def __repr__(self): 363 return '<%s impl=%r>' % (self.__class__.__name__, self._impl) 364 365 def __enter__(self): 366 # Prepare `with` context 367 return self 368 369 def __exit__(self, exc_type, value, traceback): 370 # Close connection after `with` context 371 if self.is_open: 372 self.close() 373 374 def _cleanup(self): 375 """Clean up members that might inhibit garbage collection 376 377 """ 378 with self._cleanup_mutex: 379 if self._impl is not None: 380 self._impl.ioloop.close() 381 self._ready_events.clear() 382 self._closed_result.reset() 383 384 @contextlib.contextmanager 385 def _acquire_event_dispatch(self): 386 """ Context manager that controls access to event dispatcher for 387 preventing reentrancy. 388 389 The "as" value is True if the managed code block owns the event 390 dispatcher and False if caller higher up in the call stack already owns 391 it. Only managed code that gets ownership (got True) is permitted to 392 dispatch 393 """ 394 try: 395 # __enter__ part 396 self._event_dispatch_suspend_depth += 1 397 yield self._event_dispatch_suspend_depth == 1 398 finally: 399 # __exit__ part 400 self._event_dispatch_suspend_depth -= 1 401 402 def _create_connection(self, configs, impl_class): 403 """Run connection workflow, blocking until it completes. 404 405 :param None | pika.connection.Parameters | sequence configs: Connection 406 parameters instance or non-empty sequence of them. 407 :param None | SelectConnection impl_class: for tests/debugging only; 408 implementation class; 409 410 :rtype: impl_class 411 412 :raises: exception on failure 413 """ 414 415 if configs is None: 416 configs = (pika.connection.Parameters(),) 417 418 if isinstance(configs, pika.connection.Parameters): 419 configs = (configs,) 420 421 if not configs: 422 raise ValueError('Expected a non-empty sequence of connection ' 423 'parameters, but got {!r}.'.format(configs)) 424 425 # Connection workflow completion args 426 # `result` may be an instance of connection on success or exception on 427 # failure. 428 on_cw_done_result = _CallbackResult( 429 namedtuple('BlockingConnection_OnConnectionWorkflowDoneArgs', 430 'result')) 431 432 impl_class = impl_class or select_connection.SelectConnection 433 434 ioloop = select_connection.IOLoop() 435 436 ioloop.activate_poller() 437 try: 438 impl_class.create_connection( 439 configs, 440 on_done=on_cw_done_result.set_value_once, 441 custom_ioloop=ioloop) 442 443 while not on_cw_done_result.ready: 444 ioloop.poll() 445 ioloop.process_timeouts() 446 447 if isinstance(on_cw_done_result.value.result, BaseException): 448 error = on_cw_done_result.value.result 449 LOGGER.error('Connection workflow failed: %r', error) 450 raise self._reap_last_connection_workflow_error(error) 451 else: 452 LOGGER.info('Connection workflow succeeded: %r', 453 on_cw_done_result.value.result) 454 return on_cw_done_result.value.result 455 except Exception: 456 LOGGER.exception('Error in _create_connection().') 457 ioloop.close() 458 self._cleanup() 459 raise 460 461 @staticmethod 462 def _reap_last_connection_workflow_error(error): 463 """Extract exception value from the last connection attempt 464 465 :param Exception error: error passed by the `AMQPConnectionWorkflow` 466 completion callback. 467 468 :returns: Exception value from the last connection attempt 469 :rtype: Exception 470 """ 471 if isinstance(error, connection_workflow.AMQPConnectionWorkflowFailed): 472 # Extract exception value from the last connection attempt 473 error = error.exceptions[-1] 474 if isinstance(error, 475 connection_workflow.AMQPConnectorSocketConnectError): 476 error = exceptions.AMQPConnectionError(error) 477 elif isinstance(error, 478 connection_workflow.AMQPConnectorPhaseErrorBase): 479 error = error.exception 480 481 return error 482 483 def _flush_output(self, *waiters): 484 """ Flush output and process input while waiting for any of the given 485 callbacks to return true. The wait is aborted upon connection-close. 486 Otherwise, processing continues until the output is flushed AND at least 487 one of the callbacks returns true. If there are no callbacks, then 488 processing ends when all output is flushed. 489 490 :param waiters: sequence of zero or more callables taking no args and 491 returning true when it's time to stop processing. 492 Their results are OR'ed together. 493 :raises: exceptions passed by impl if opening of connection fails or 494 connection closes. 495 """ 496 if self.is_closed: 497 raise exceptions.ConnectionWrongStateError() 498 499 # Conditions for terminating the processing loop: 500 # connection closed 501 # OR 502 # empty outbound buffer and no waiters 503 # OR 504 # empty outbound buffer and any waiter is ready 505 is_done = (lambda: 506 self._closed_result.ready or 507 ((not self._impl._transport or 508 self._impl._get_write_buffer_size() == 0) and 509 (not waiters or any(ready() for ready in waiters)))) 510 511 # Process I/O until our completion condition is satisfied 512 while not is_done(): 513 self._impl.ioloop.poll() 514 self._impl.ioloop.process_timeouts() 515 516 if self._closed_result.ready: 517 try: 518 if not isinstance(self._closed_result.value.error, 519 exceptions.ConnectionClosedByClient): 520 LOGGER.error('Unexpected connection close detected: %r', 521 self._closed_result.value.error) 522 raise self._closed_result.value.error 523 else: 524 LOGGER.info('User-initiated close: result=%r', 525 self._closed_result.value) 526 finally: 527 self._cleanup() 528 529 def _request_channel_dispatch(self, channel_number): 530 """Called by BlockingChannel instances to request a call to their 531 _dispatch_events method or to terminate `process_data_events`; 532 BlockingConnection will honor these requests from a safe context. 533 534 :param int channel_number: positive channel number to request a call 535 to the channel's `_dispatch_events`; a negative channel number to 536 request termination of `process_data_events` 537 """ 538 self._channels_pending_dispatch.add(channel_number) 539 540 def _dispatch_channel_events(self): 541 """Invoke the `_dispatch_events` method on open channels that requested 542 it 543 """ 544 if not self._channels_pending_dispatch: 545 return 546 547 with self._acquire_event_dispatch() as dispatch_acquired: 548 if not dispatch_acquired: 549 # Nested dispatch or dispatch blocked higher in call stack 550 return 551 552 candidates = list(self._channels_pending_dispatch) 553 self._channels_pending_dispatch.clear() 554 555 for channel_number in candidates: 556 if channel_number < 0: 557 # This was meant to terminate process_data_events 558 continue 559 560 try: 561 impl_channel = self._impl._channels[channel_number] 562 except KeyError: 563 continue 564 565 if impl_channel.is_open: 566 impl_channel._get_cookie()._dispatch_events() 567 568 def _on_timer_ready(self, evt): 569 """Handle expiry of a timer that was registered via 570 `_adapter_call_later()` 571 572 :param _TimerEvt evt: 573 574 """ 575 self._ready_events.append(evt) 576 577 def _on_threadsafe_callback(self, user_callback): 578 """Handle callback that was registered via 579 `self._impl._adapter_add_callback_threadsafe`. 580 581 :param user_callback: callback passed to our 582 `add_callback_threadsafe` by the application. 583 584 """ 585 # Turn it into a 0-delay timeout to take advantage of our existing logic 586 # that deals with reentrancy 587 self.call_later(0, user_callback) 588 589 def _on_connection_blocked(self, user_callback, _impl, method_frame): 590 """Handle Connection.Blocked notification from RabbitMQ broker 591 592 :param callable user_callback: callback passed to 593 `add_on_connection_blocked_callback` 594 :param select_connection.SelectConnection _impl: 595 :param pika.frame.Method method_frame: method frame having `method` 596 member of type `pika.spec.Connection.Blocked` 597 """ 598 self._ready_events.append( 599 _ConnectionBlockedEvt(user_callback, method_frame)) 600 601 def _on_connection_unblocked(self, user_callback, _impl, method_frame): 602 """Handle Connection.Unblocked notification from RabbitMQ broker 603 604 :param callable user_callback: callback passed to 605 `add_on_connection_unblocked_callback` 606 :param select_connection.SelectConnection _impl: 607 :param pika.frame.Method method_frame: method frame having `method` 608 member of type `pika.spec.Connection.Blocked` 609 """ 610 self._ready_events.append( 611 _ConnectionUnblockedEvt(user_callback, method_frame)) 612 613 def _dispatch_connection_events(self): 614 """Dispatch ready connection events""" 615 if not self._ready_events: 616 return 617 618 with self._acquire_event_dispatch() as dispatch_acquired: 619 if not dispatch_acquired: 620 # Nested dispatch or dispatch blocked higher in call stack 621 return 622 623 # Limit dispatch to the number of currently ready events to avoid 624 # getting stuck in this loop 625 for _ in compat.xrange(len(self._ready_events)): 626 try: 627 evt = self._ready_events.popleft() 628 except IndexError: 629 # Some events (e.g., timers) must have been cancelled 630 break 631 632 evt.dispatch() 633 634 def add_on_connection_blocked_callback(self, callback): 635 """RabbitMQ AMQP extension - Add a callback to be notified when the 636 connection gets blocked (`Connection.Blocked` received from RabbitMQ) 637 due to the broker running low on resources (memory or disk). In this 638 state RabbitMQ suspends processing incoming data until the connection 639 is unblocked, so it's a good idea for publishers receiving this 640 notification to suspend publishing until the connection becomes 641 unblocked. 642 643 NOTE: due to the blocking nature of BlockingConnection, if it's sending 644 outbound data while the connection is/becomes blocked, the call may 645 remain blocked until the connection becomes unblocked, if ever. You 646 may use `ConnectionParameters.blocked_connection_timeout` to abort a 647 BlockingConnection method call with an exception when the connection 648 remains blocked longer than the given timeout value. 649 650 See also `Connection.add_on_connection_unblocked_callback()` 651 652 See also `ConnectionParameters.blocked_connection_timeout`. 653 654 :param callable callback: Callback to call on `Connection.Blocked`, 655 having the signature `callback(connection, pika.frame.Method)`, 656 where connection is the `BlockingConnection` instance and the method 657 frame's `method` member is of type `pika.spec.Connection.Blocked` 658 659 """ 660 self._impl.add_on_connection_blocked_callback( 661 functools.partial(self._on_connection_blocked, 662 functools.partial(callback, self))) 663 664 def add_on_connection_unblocked_callback(self, callback): 665 """RabbitMQ AMQP extension - Add a callback to be notified when the 666 connection gets unblocked (`Connection.Unblocked` frame is received from 667 RabbitMQ) letting publishers know it's ok to start publishing again. 668 669 :param callable callback: Callback to call on Connection.Unblocked`, 670 having the signature `callback(connection, pika.frame.Method)`, 671 where connection is the `BlockingConnection` instance and the method 672 frame's `method` member is of type `pika.spec.Connection.Unblocked` 673 674 """ 675 self._impl.add_on_connection_unblocked_callback( 676 functools.partial(self._on_connection_unblocked, 677 functools.partial(callback, self))) 678 679 def call_later(self, delay, callback): 680 """Create a single-shot timer to fire after delay seconds. Do not 681 confuse with Tornado's timeout where you pass in the time you want to 682 have your callback called. Only pass in the seconds until it's to be 683 called. 684 685 NOTE: the timer callbacks are dispatched only in the scope of 686 specially-designated methods: see 687 `BlockingConnection.process_data_events()` and 688 `BlockingChannel.start_consuming()`. 689 690 :param float delay: The number of seconds to wait to call callback 691 :param callable callback: The callback method with the signature 692 callback() 693 :returns: Opaque timer id 694 :rtype: int 695 696 """ 697 validators.require_callback(callback) 698 699 evt = _TimerEvt(callback=callback) 700 timer_id = self._impl._adapter_call_later( 701 delay, functools.partial(self._on_timer_ready, evt)) 702 evt.timer_id = timer_id 703 704 return timer_id 705 706 def add_callback_threadsafe(self, callback): 707 """Requests a call to the given function as soon as possible in the 708 context of this connection's thread. 709 710 NOTE: This is the only thread-safe method in `BlockingConnection`. All 711 other manipulations of `BlockingConnection` must be performed from the 712 connection's thread. 713 714 NOTE: the callbacks are dispatched only in the scope of 715 specially-designated methods: see 716 `BlockingConnection.process_data_events()` and 717 `BlockingChannel.start_consuming()`. 718 719 For example, a thread may request a call to the 720 `BlockingChannel.basic_ack` method of a `BlockingConnection` that is 721 running in a different thread via 722 723 ``` 724 connection.add_callback_threadsafe( 725 functools.partial(channel.basic_ack, delivery_tag=...)) 726 ``` 727 728 NOTE: if you know that the requester is running on the same thread as 729 the connection it is more efficient to use the 730 `BlockingConnection.call_later()` method with a delay of 0. 731 732 :param callable callback: The callback method; must be callable 733 :raises pika.exceptions.ConnectionWrongStateError: if connection is 734 closed 735 """ 736 with self._cleanup_mutex: 737 # NOTE: keep in mind that we may be called from another thread and 738 # this mutex only synchronizes us with our connection cleanup logic, 739 # so a simple check for "is_closed" is pretty much all we're allowed 740 # to do here besides calling the only thread-safe method 741 # _adapter_add_callback_threadsafe(). 742 if self.is_closed: 743 raise exceptions.ConnectionWrongStateError( 744 'BlockingConnection.add_callback_threadsafe() called on ' 745 'closed or closing connection.') 746 747 self._impl._adapter_add_callback_threadsafe( 748 functools.partial(self._on_threadsafe_callback, callback)) 749 750 def remove_timeout(self, timeout_id): 751 """Remove a timer if it's still in the timeout stack 752 753 :param timeout_id: The opaque timer id to remove 754 755 """ 756 # Remove from the impl's timeout stack 757 self._impl._adapter_remove_timeout(timeout_id) 758 759 # Remove from ready events, if the timer fired already 760 for i, evt in enumerate(self._ready_events): 761 if isinstance(evt, _TimerEvt) and evt.timer_id == timeout_id: 762 index_to_remove = i 763 break 764 else: 765 # Not found 766 return 767 768 del self._ready_events[index_to_remove] 769 770 def close(self, reply_code=200, reply_text='Normal shutdown'): 771 """Disconnect from RabbitMQ. If there are any open channels, it will 772 attempt to close them prior to fully disconnecting. Channels which 773 have active consumers will attempt to send a Basic.Cancel to RabbitMQ 774 to cleanly stop the delivery of messages prior to closing the channel. 775 776 :param int reply_code: The code number for the close 777 :param str reply_text: The text reason for the close 778 779 :raises pika.exceptions.ConnectionWrongStateError: if called on a closed 780 connection (NEW in v1.0.0) 781 """ 782 if not self.is_open: 783 msg = '{}.close({}, {!r}) called on closed connection.'.format( 784 self.__class__.__name__, reply_code, reply_text) 785 LOGGER.error(msg) 786 raise exceptions.ConnectionWrongStateError(msg) 787 788 LOGGER.info('Closing connection (%s): %s', reply_code, reply_text) 789 790 # Close channels that remain opened 791 for impl_channel in compat.dictvalues(self._impl._channels): 792 channel = impl_channel._get_cookie() 793 if channel.is_open: 794 try: 795 channel.close(reply_code, reply_text) 796 except exceptions.ChannelClosed as exc: 797 # Log and suppress broker-closed channel 798 LOGGER.warning( 799 'Got ChannelClosed while closing channel ' 800 'from connection.close: %r', exc) 801 802 # Close the connection 803 self._impl.close(reply_code, reply_text) 804 805 self._flush_output(self._closed_result.is_ready) 806 807 def process_data_events(self, time_limit=0): 808 """Will make sure that data events are processed. Dispatches timer and 809 channel callbacks if not called from the scope of BlockingConnection or 810 BlockingChannel callback. Your app can block on this method. 811 812 :param float time_limit: suggested upper bound on processing time in 813 seconds. The actual blocking time depends on the granularity of the 814 underlying ioloop. Zero means return as soon as possible. None means 815 there is no limit on processing time and the function will block 816 until I/O produces actionable events. Defaults to 0 for backward 817 compatibility. This parameter is NEW in pika 0.10.0. 818 """ 819 with self._acquire_event_dispatch() as dispatch_acquired: 820 # Check if we can actually process pending events 821 common_terminator = lambda: bool(dispatch_acquired and 822 (self._channels_pending_dispatch or 823 self._ready_events)) 824 if time_limit is None: 825 self._flush_output(common_terminator) 826 else: 827 with _IoloopTimerContext(time_limit, self._impl) as timer: 828 self._flush_output(timer.is_ready, common_terminator) 829 830 if self._ready_events: 831 self._dispatch_connection_events() 832 833 if self._channels_pending_dispatch: 834 self._dispatch_channel_events() 835 836 def sleep(self, duration): 837 """A safer way to sleep than calling time.sleep() directly that would 838 keep the adapter from ignoring frames sent from the broker. The 839 connection will "sleep" or block the number of seconds specified in 840 duration in small intervals. 841 842 :param float duration: The time to sleep in seconds 843 844 """ 845 assert duration >= 0, duration 846 847 deadline = compat.time_now() + duration 848 time_limit = duration 849 # Process events at least once 850 while True: 851 self.process_data_events(time_limit) 852 time_limit = deadline - compat.time_now() 853 if time_limit <= 0: 854 break 855 856 def channel(self, channel_number=None): 857 """Create a new channel with the next available channel number or pass 858 in a channel number to use. Must be non-zero if you would like to 859 specify but it is recommended that you let Pika manage the channel 860 numbers. 861 862 :rtype: pika.adapters.blocking_connection.BlockingChannel 863 """ 864 with _CallbackResult(self._OnChannelOpenedArgs) as opened_args: 865 impl_channel = self._impl.channel( 866 channel_number=channel_number, 867 on_open_callback=opened_args.set_value_once) 868 869 # Create our proxy channel 870 channel = BlockingChannel(impl_channel, self) 871 872 # Link implementation channel with our proxy channel 873 impl_channel._set_cookie(channel) 874 875 # Drive I/O until Channel.Open-ok 876 channel._flush_output(opened_args.is_ready) 877 878 return channel 879 880 # 881 # Connections state properties 882 # 883 884 @property 885 def is_closed(self): 886 """ 887 Returns a boolean reporting the current connection state. 888 """ 889 return self._impl.is_closed 890 891 @property 892 def is_open(self): 893 """ 894 Returns a boolean reporting the current connection state. 895 """ 896 return self._impl.is_open 897 898 # 899 # Properties that reflect server capabilities for the current connection 900 # 901 902 @property 903 def basic_nack_supported(self): 904 """Specifies if the server supports basic.nack on the active connection. 905 906 :rtype: bool 907 908 """ 909 return self._impl.basic_nack 910 911 @property 912 def consumer_cancel_notify_supported(self): 913 """Specifies if the server supports consumer cancel notification on the 914 active connection. 915 916 :rtype: bool 917 918 """ 919 return self._impl.consumer_cancel_notify 920 921 @property 922 def exchange_exchange_bindings_supported(self): 923 """Specifies if the active connection supports exchange to exchange 924 bindings. 925 926 :rtype: bool 927 928 """ 929 return self._impl.exchange_exchange_bindings 930 931 @property 932 def publisher_confirms_supported(self): 933 """Specifies if the active connection can use publisher confirmations. 934 935 :rtype: bool 936 937 """ 938 return self._impl.publisher_confirms 939 940 # Legacy property names for backward compatibility 941 basic_nack = basic_nack_supported 942 consumer_cancel_notify = consumer_cancel_notify_supported 943 exchange_exchange_bindings = exchange_exchange_bindings_supported 944 publisher_confirms = publisher_confirms_supported 945 946 947class _ChannelPendingEvt(object): 948 """Base class for BlockingChannel pending events""" 949 950 951class _ConsumerDeliveryEvt(_ChannelPendingEvt): 952 """This event represents consumer message delivery `Basic.Deliver`; it 953 contains method, properties, and body of the delivered message. 954 """ 955 956 __slots__ = ('method', 'properties', 'body') 957 958 def __init__(self, method, properties, body): 959 """ 960 :param spec.Basic.Deliver method: NOTE: consumer_tag and delivery_tag 961 are valid only within source channel 962 :param spec.BasicProperties properties: message properties 963 :param bytes body: message body; empty string if no body 964 """ 965 self.method = method 966 self.properties = properties 967 self.body = body 968 969 970class _ConsumerCancellationEvt(_ChannelPendingEvt): 971 """This event represents server-initiated consumer cancellation delivered to 972 client via Basic.Cancel. After receiving Basic.Cancel, there will be no 973 further deliveries for the consumer identified by `consumer_tag` in 974 `Basic.Cancel` 975 """ 976 977 __slots__ = ('method_frame',) 978 979 def __init__(self, method_frame): 980 """ 981 :param pika.frame.Method method_frame: method frame with method of type 982 `spec.Basic.Cancel` 983 """ 984 self.method_frame = method_frame 985 986 def __repr__(self): 987 return '<%s method_frame=%r>' % (self.__class__.__name__, 988 self.method_frame) 989 990 @property 991 def method(self): 992 """method of type spec.Basic.Cancel""" 993 return self.method_frame.method 994 995 996class _ReturnedMessageEvt(_ChannelPendingEvt): 997 """This event represents a message returned by broker via `Basic.Return`""" 998 999 __slots__ = ('callback', 'channel', 'method', 'properties', 'body') 1000 1001 def __init__(self, callback, channel, method, properties, body): 1002 """ 1003 :param callable callback: user's callback, having the signature 1004 callback(channel, method, properties, body), where 1005 channel: pika.Channel 1006 method: pika.spec.Basic.Return 1007 properties: pika.spec.BasicProperties 1008 body: bytes 1009 :param pika.Channel channel: 1010 :param pika.spec.Basic.Return method: 1011 :param pika.spec.BasicProperties properties: 1012 :param bytes body: 1013 """ 1014 self.callback = callback 1015 self.channel = channel 1016 self.method = method 1017 self.properties = properties 1018 self.body = body 1019 1020 def __repr__(self): 1021 return ('<%s callback=%r channel=%r method=%r properties=%r ' 1022 'body=%.300r>') % (self.__class__.__name__, self.callback, 1023 self.channel, self.method, self.properties, 1024 self.body) 1025 1026 def dispatch(self): 1027 """Dispatch user's callback""" 1028 self.callback(self.channel, self.method, self.properties, self.body) 1029 1030 1031class ReturnedMessage(object): 1032 """Represents a message returned via Basic.Return in publish-acknowledgments 1033 mode 1034 """ 1035 1036 __slots__ = ('method', 'properties', 'body') 1037 1038 def __init__(self, method, properties, body): 1039 """ 1040 :param spec.Basic.Return method: 1041 :param spec.BasicProperties properties: message properties 1042 :param bytes body: message body; empty string if no body 1043 """ 1044 self.method = method 1045 self.properties = properties 1046 self.body = body 1047 1048 1049class _ConsumerInfo(object): 1050 """Information about an active consumer""" 1051 1052 __slots__ = ('consumer_tag', 'auto_ack', 'on_message_callback', 1053 'alternate_event_sink', 'state') 1054 1055 # Consumer states 1056 SETTING_UP = 1 1057 ACTIVE = 2 1058 TEARING_DOWN = 3 1059 CANCELLED_BY_BROKER = 4 1060 1061 def __init__(self, 1062 consumer_tag, 1063 auto_ack, 1064 on_message_callback=None, 1065 alternate_event_sink=None): 1066 """ 1067 NOTE: exactly one of callback/alternate_event_sink musts be non-None. 1068 1069 :param str consumer_tag: 1070 :param bool auto_ack: the no-ack value for the consumer 1071 :param callable on_message_callback: The function for dispatching messages to 1072 user, having the signature: 1073 on_message_callback(channel, method, properties, body) 1074 channel: BlockingChannel 1075 method: spec.Basic.Deliver 1076 properties: spec.BasicProperties 1077 body: bytes 1078 :param callable alternate_event_sink: if specified, _ConsumerDeliveryEvt 1079 and _ConsumerCancellationEvt objects will be diverted to this 1080 callback instead of being deposited in the channel's 1081 `_pending_events` container. Signature: 1082 alternate_event_sink(evt) 1083 """ 1084 assert (on_message_callback is None) != ( 1085 alternate_event_sink is None 1086 ), ('exactly one of on_message_callback/alternate_event_sink must be non-None', 1087 on_message_callback, alternate_event_sink) 1088 self.consumer_tag = consumer_tag 1089 self.auto_ack = auto_ack 1090 self.on_message_callback = on_message_callback 1091 self.alternate_event_sink = alternate_event_sink 1092 self.state = self.SETTING_UP 1093 1094 @property 1095 def setting_up(self): 1096 """True if in SETTING_UP state""" 1097 return self.state == self.SETTING_UP 1098 1099 @property 1100 def active(self): 1101 """True if in ACTIVE state""" 1102 return self.state == self.ACTIVE 1103 1104 @property 1105 def tearing_down(self): 1106 """True if in TEARING_DOWN state""" 1107 return self.state == self.TEARING_DOWN 1108 1109 @property 1110 def cancelled_by_broker(self): 1111 """True if in CANCELLED_BY_BROKER state""" 1112 return self.state == self.CANCELLED_BY_BROKER 1113 1114 1115class _QueueConsumerGeneratorInfo(object): 1116 """Container for information about the active queue consumer generator """ 1117 __slots__ = ('params', 'consumer_tag', 'pending_events') 1118 1119 def __init__(self, params, consumer_tag): 1120 """ 1121 :params tuple params: a three-tuple (queue, auto_ack, exclusive) that were 1122 used to create the queue consumer 1123 :param str consumer_tag: consumer tag 1124 """ 1125 self.params = params 1126 self.consumer_tag = consumer_tag 1127 #self.messages = deque() 1128 1129 # Holds pending events of types _ConsumerDeliveryEvt and 1130 # _ConsumerCancellationEvt 1131 self.pending_events = deque() 1132 1133 def __repr__(self): 1134 return '<%s params=%r consumer_tag=%r>' % ( 1135 self.__class__.__name__, self.params, self.consumer_tag) 1136 1137 1138class BlockingChannel(object): 1139 """The BlockingChannel implements blocking semantics for most things that 1140 one would use callback-passing-style for with the 1141 :py:class:`~pika.channel.Channel` class. In addition, 1142 the `BlockingChannel` class implements a :term:`generator` that allows 1143 you to :doc:`consume messages </examples/blocking_consumer_generator>` 1144 without using callbacks. 1145 1146 Example of creating a BlockingChannel:: 1147 1148 import pika 1149 1150 # Create our connection object 1151 connection = pika.BlockingConnection() 1152 1153 # The returned object will be a synchronous channel 1154 channel = connection.channel() 1155 1156 """ 1157 1158 # Used as value_class with _CallbackResult for receiving Basic.GetOk args 1159 _RxMessageArgs = namedtuple( 1160 'BlockingChannel__RxMessageArgs', 1161 [ 1162 'channel', # implementation pika.Channel instance 1163 'method', # Basic.GetOk 1164 'properties', # pika.spec.BasicProperties 1165 'body' # str, unicode, or bytes (python 3.x) 1166 ]) 1167 1168 # For use as value_class with any _CallbackResult that expects method_frame 1169 # as the only arg 1170 _MethodFrameCallbackResultArgs = namedtuple( 1171 'BlockingChannel__MethodFrameCallbackResultArgs', 'method_frame') 1172 1173 # Broker's basic-ack/basic-nack args when delivery confirmation is enabled; 1174 # may concern a single or multiple messages 1175 _OnMessageConfirmationReportArgs = namedtuple( 1176 'BlockingChannel__OnMessageConfirmationReportArgs', 'method_frame') 1177 1178 # For use as value_class with _CallbackResult expecting Channel.Flow 1179 # confirmation. 1180 _FlowOkCallbackResultArgs = namedtuple( 1181 'BlockingChannel__FlowOkCallbackResultArgs', 1182 'active' # True if broker will start or continue sending; False if not 1183 ) 1184 1185 _CONSUMER_CANCELLED_CB_KEY = 'blocking_channel_consumer_cancelled' 1186 1187 def __init__(self, channel_impl, connection): 1188 """Create a new instance of the Channel 1189 1190 :param pika.channel.Channel channel_impl: Channel implementation object 1191 as returned from SelectConnection.channel() 1192 :param BlockingConnection connection: The connection object 1193 1194 """ 1195 self._impl = channel_impl 1196 self._connection = connection 1197 1198 # A mapping of consumer tags to _ConsumerInfo for active consumers 1199 self._consumer_infos = dict() 1200 1201 # Queue consumer generator generator info of type 1202 # _QueueConsumerGeneratorInfo created by BlockingChannel.consume 1203 self._queue_consumer_generator = None 1204 1205 # Whether RabbitMQ delivery confirmation has been enabled 1206 self._delivery_confirmation = False 1207 1208 # Receives message delivery confirmation report (Basic.ack or 1209 # Basic.nack) from broker when delivery confirmations are enabled 1210 self._message_confirmation_result = _CallbackResult( 1211 self._OnMessageConfirmationReportArgs) 1212 1213 # deque of pending events: _ConsumerDeliveryEvt and 1214 # _ConsumerCancellationEvt objects that will be returned by 1215 # `BlockingChannel.get_event()` 1216 self._pending_events = deque() 1217 1218 # Holds a ReturnedMessage object representing a message received via 1219 # Basic.Return in publisher-acknowledgments mode. 1220 self._puback_return = None 1221 1222 # self._on_channel_closed() saves the reason exception here 1223 self._closing_reason = None # type: None | Exception 1224 1225 # Receives Basic.ConsumeOk reply from server 1226 self._basic_consume_ok_result = _CallbackResult() 1227 1228 # Receives args from Basic.GetEmpty response 1229 # http://www.rabbitmq.com/amqp-0-9-1-reference.html#basic.get 1230 self._basic_getempty_result = _CallbackResult( 1231 self._MethodFrameCallbackResultArgs) 1232 1233 self._impl.add_on_cancel_callback(self._on_consumer_cancelled_by_broker) 1234 1235 self._impl.add_callback( 1236 self._basic_consume_ok_result.signal_once, 1237 replies=[pika.spec.Basic.ConsumeOk], 1238 one_shot=False) 1239 1240 self._impl.add_on_close_callback(self._on_channel_closed) 1241 1242 self._impl.add_callback( 1243 self._basic_getempty_result.set_value_once, 1244 replies=[pika.spec.Basic.GetEmpty], 1245 one_shot=False) 1246 1247 LOGGER.info("Created channel=%s", self.channel_number) 1248 1249 def __int__(self): 1250 """Return the channel object as its channel number 1251 1252 NOTE: inherited from legacy BlockingConnection; might be error-prone; 1253 use `channel_number` property instead. 1254 1255 :rtype: int 1256 1257 """ 1258 return self.channel_number 1259 1260 def __repr__(self): 1261 return '<%s impl=%r>' % (self.__class__.__name__, self._impl) 1262 1263 def __enter__(self): 1264 return self 1265 1266 def __exit__(self, exc_type, value, traceback): 1267 if self.is_open: 1268 self.close() 1269 1270 def _cleanup(self): 1271 """Clean up members that might inhibit garbage collection""" 1272 self._message_confirmation_result.reset() 1273 self._pending_events = deque() 1274 self._consumer_infos = dict() 1275 self._queue_consumer_generator = None 1276 1277 @property 1278 def channel_number(self): 1279 """Channel number""" 1280 return self._impl.channel_number 1281 1282 @property 1283 def connection(self): 1284 """The channel's BlockingConnection instance""" 1285 return self._connection 1286 1287 @property 1288 def is_closed(self): 1289 """Returns True if the channel is closed. 1290 1291 :rtype: bool 1292 1293 """ 1294 return self._impl.is_closed 1295 1296 @property 1297 def is_open(self): 1298 """Returns True if the channel is open. 1299 1300 :rtype: bool 1301 1302 """ 1303 return self._impl.is_open 1304 1305 @property 1306 def consumer_tags(self): 1307 """Property method that returns a list of consumer tags for active 1308 consumers 1309 1310 :rtype: list 1311 1312 """ 1313 return compat.dictkeys(self._consumer_infos) 1314 1315 _ALWAYS_READY_WAITERS = ((lambda: True),) 1316 1317 def _flush_output(self, *waiters): 1318 """ Flush output and process input while waiting for any of the given 1319 callbacks to return true. The wait is aborted upon channel-close or 1320 connection-close. 1321 Otherwise, processing continues until the output is flushed AND at least 1322 one of the callbacks returns true. If there are no callbacks, then 1323 processing ends when all output is flushed. 1324 1325 :param waiters: sequence of zero or more callables taking no args and 1326 returning true when it's time to stop processing. 1327 Their results are OR'ed together. An empty sequence is 1328 treated as equivalent to a waiter always returning true. 1329 """ 1330 if self.is_closed: 1331 self._impl._raise_if_not_open() 1332 1333 if not waiters: 1334 waiters = self._ALWAYS_READY_WAITERS 1335 1336 self._connection._flush_output(lambda: self.is_closed, *waiters) 1337 1338 if self.is_closed and isinstance(self._closing_reason, 1339 exceptions.ChannelClosedByBroker): 1340 raise self._closing_reason # pylint: disable=E0702 1341 1342 def _on_puback_message_returned(self, channel, method, properties, body): 1343 """Called as the result of Basic.Return from broker in 1344 publisher-acknowledgements mode. Saves the info as a ReturnedMessage 1345 instance in self._puback_return. 1346 1347 :param pika.Channel channel: our self._impl channel 1348 :param pika.spec.Basic.Return method: 1349 :param pika.spec.BasicProperties properties: message properties 1350 :param bytes body: returned message body; empty string if no body 1351 """ 1352 assert channel is self._impl, (channel.channel_number, 1353 self.channel_number) 1354 1355 assert isinstance(method, pika.spec.Basic.Return), method 1356 assert isinstance(properties, pika.spec.BasicProperties), (properties) 1357 1358 LOGGER.warning( 1359 "Published message was returned: _delivery_confirmation=%s; " 1360 "channel=%s; method=%r; properties=%r; body_size=%d; " 1361 "body_prefix=%.255r", self._delivery_confirmation, 1362 channel.channel_number, method, properties, 1363 len(body) if body is not None else None, body) 1364 1365 self._puback_return = ReturnedMessage(method, properties, body) 1366 1367 def _add_pending_event(self, evt): 1368 """Append an event to the channel's list of events that are ready for 1369 dispatch to user and signal our connection that this channel is ready 1370 for event dispatch 1371 1372 :param _ChannelPendingEvt evt: an event derived from _ChannelPendingEvt 1373 """ 1374 self._pending_events.append(evt) 1375 self.connection._request_channel_dispatch(self.channel_number) 1376 1377 def _on_channel_closed(self, _channel, reason): 1378 """Callback from impl notifying us that the channel has been closed. 1379 This may be as the result of user-, broker-, or internal connection 1380 clean-up initiated closing or meta-closing of the channel. 1381 1382 If it resulted from receiving `Channel.Close` from broker, we will 1383 expedite waking up of the event subsystem so that it may respond by 1384 raising `ChannelClosed` from user's context. 1385 1386 NOTE: We can't raise exceptions in callbacks in order to protect 1387 the integrity of the underlying implementation. BlockingConnection's 1388 underlying asynchronous connection adapter (SelectConnection) uses 1389 callbacks to communicate with us. If BlockingConnection leaks exceptions 1390 back into the I/O loop or the asynchronous connection adapter, we 1391 interrupt their normal workflow and introduce a high likelihood of state 1392 inconsistency. 1393 1394 See `pika.Channel.add_on_close_callback()` for additional documentation. 1395 1396 :param pika.Channel _channel: (unused) 1397 :param Exception reason: 1398 1399 """ 1400 LOGGER.debug('_on_channel_closed: %r; %r', reason, self) 1401 1402 self._closing_reason = reason 1403 1404 if isinstance(reason, exceptions.ChannelClosedByBroker): 1405 self._cleanup() 1406 1407 # Request urgent termination of `process_data_events()`, in case 1408 # it's executing or next time it will execute 1409 self.connection._request_channel_dispatch(-self.channel_number) 1410 1411 def _on_consumer_cancelled_by_broker(self, method_frame): 1412 """Called by impl when broker cancels consumer via Basic.Cancel. 1413 1414 This is a RabbitMQ-specific feature. The circumstances include deletion 1415 of queue being consumed as well as failure of a HA node responsible for 1416 the queue being consumed. 1417 1418 :param pika.frame.Method method_frame: method frame with the 1419 `spec.Basic.Cancel` method 1420 1421 """ 1422 evt = _ConsumerCancellationEvt(method_frame) 1423 1424 consumer = self._consumer_infos[method_frame.method.consumer_tag] 1425 1426 # Don't interfere with client-initiated cancellation flow 1427 if not consumer.tearing_down: 1428 consumer.state = _ConsumerInfo.CANCELLED_BY_BROKER 1429 1430 if consumer.alternate_event_sink is not None: 1431 consumer.alternate_event_sink(evt) 1432 else: 1433 self._add_pending_event(evt) 1434 1435 def _on_consumer_message_delivery(self, _channel, method, properties, body): 1436 """Called by impl when a message is delivered for a consumer 1437 1438 :param Channel channel: The implementation channel object 1439 :param spec.Basic.Deliver method: 1440 :param pika.spec.BasicProperties properties: message properties 1441 :param bytes body: delivered message body; empty string if no body 1442 """ 1443 evt = _ConsumerDeliveryEvt(method, properties, body) 1444 1445 consumer = self._consumer_infos[method.consumer_tag] 1446 1447 if consumer.alternate_event_sink is not None: 1448 consumer.alternate_event_sink(evt) 1449 else: 1450 self._add_pending_event(evt) 1451 1452 def _on_consumer_generator_event(self, evt): 1453 """Sink for the queue consumer generator's consumer events; append the 1454 event to queue consumer generator's pending events buffer. 1455 1456 :param evt: an object of type _ConsumerDeliveryEvt or 1457 _ConsumerCancellationEvt 1458 """ 1459 self._queue_consumer_generator.pending_events.append(evt) 1460 # Schedule termination of connection.process_data_events using a 1461 # negative channel number 1462 self.connection._request_channel_dispatch(-self.channel_number) 1463 1464 def _cancel_all_consumers(self): 1465 """Cancel all consumers. 1466 1467 NOTE: pending non-ackable messages will be lost; pending ackable 1468 messages will be rejected. 1469 1470 """ 1471 if self._consumer_infos: 1472 LOGGER.debug('Cancelling %i consumers', len(self._consumer_infos)) 1473 1474 if self._queue_consumer_generator is not None: 1475 # Cancel queue consumer generator 1476 self.cancel() 1477 1478 # Cancel consumers created via basic_consume 1479 for consumer_tag in compat.dictkeys(self._consumer_infos): 1480 self.basic_cancel(consumer_tag) 1481 1482 def _dispatch_events(self): 1483 """Called by BlockingConnection to dispatch pending events. 1484 1485 `BlockingChannel` schedules this callback via 1486 `BlockingConnection._request_channel_dispatch` 1487 """ 1488 while self._pending_events: 1489 evt = self._pending_events.popleft() 1490 1491 if type(evt) is _ConsumerDeliveryEvt: # pylint: disable=C0123 1492 consumer_info = self._consumer_infos[evt.method.consumer_tag] 1493 consumer_info.on_message_callback(self, evt.method, 1494 evt.properties, evt.body) 1495 1496 elif type(evt) is _ConsumerCancellationEvt: # pylint: disable=C0123 1497 del self._consumer_infos[evt.method_frame.method.consumer_tag] 1498 1499 self._impl.callbacks.process(self.channel_number, 1500 self._CONSUMER_CANCELLED_CB_KEY, 1501 self, evt.method_frame) 1502 else: 1503 evt.dispatch() 1504 1505 def close(self, reply_code=0, reply_text="Normal shutdown"): 1506 """Will invoke a clean shutdown of the channel with the AMQP Broker. 1507 1508 :param int reply_code: The reply code to close the channel with 1509 :param str reply_text: The reply text to close the channel with 1510 1511 """ 1512 LOGGER.debug('Channel.close(%s, %s)', reply_code, reply_text) 1513 1514 self._impl._raise_if_not_open() 1515 1516 try: 1517 # Cancel remaining consumers 1518 self._cancel_all_consumers() 1519 1520 # Close the channel 1521 self._impl.close(reply_code=reply_code, reply_text=reply_text) 1522 self._flush_output(lambda: self.is_closed) 1523 finally: 1524 self._cleanup() 1525 1526 def flow(self, active): 1527 """Turn Channel flow control off and on. 1528 1529 NOTE: RabbitMQ doesn't support active=False; per 1530 https://www.rabbitmq.com/specification.html: "active=false is not 1531 supported by the server. Limiting prefetch with basic.qos provides much 1532 better control" 1533 1534 For more information, please reference: 1535 1536 http://www.rabbitmq.com/amqp-0-9-1-reference.html#channel.flow 1537 1538 :param bool active: Turn flow on (True) or off (False) 1539 :returns: True if broker will start or continue sending; False if not 1540 :rtype: bool 1541 1542 """ 1543 with _CallbackResult(self._FlowOkCallbackResultArgs) as flow_ok_result: 1544 self._impl.flow( 1545 active=active, callback=flow_ok_result.set_value_once) 1546 self._flush_output(flow_ok_result.is_ready) 1547 return flow_ok_result.value.active 1548 1549 def add_on_cancel_callback(self, callback): 1550 """Pass a callback function that will be called when Basic.Cancel 1551 is sent by the broker. The callback function should receive a method 1552 frame parameter. 1553 1554 :param callable callback: a callable for handling broker's Basic.Cancel 1555 notification with the call signature: callback(method_frame) 1556 where method_frame is of type `pika.frame.Method` with method of 1557 type `spec.Basic.Cancel` 1558 1559 """ 1560 self._impl.callbacks.add( 1561 self.channel_number, 1562 self._CONSUMER_CANCELLED_CB_KEY, 1563 callback, 1564 one_shot=False) 1565 1566 def add_on_return_callback(self, callback): 1567 """Pass a callback function that will be called when a published 1568 message is rejected and returned by the server via `Basic.Return`. 1569 1570 :param callable callback: The method to call on callback with the 1571 signature callback(channel, method, properties, body), where 1572 channel: pika.Channel 1573 method: pika.spec.Basic.Return 1574 properties: pika.spec.BasicProperties 1575 body: bytes 1576 1577 """ 1578 self._impl.add_on_return_callback( 1579 lambda _channel, method, properties, body: ( 1580 self._add_pending_event( 1581 _ReturnedMessageEvt( 1582 callback, self, method, properties, body)))) 1583 1584 def basic_consume(self, 1585 queue, 1586 on_message_callback, 1587 auto_ack=False, 1588 exclusive=False, 1589 consumer_tag=None, 1590 arguments=None): 1591 """Sends the AMQP command Basic.Consume to the broker and binds messages 1592 for the consumer_tag to the consumer callback. If you do not pass in 1593 a consumer_tag, one will be automatically generated for you. Returns 1594 the consumer tag. 1595 1596 NOTE: the consumer callbacks are dispatched only in the scope of 1597 specially-designated methods: see 1598 `BlockingConnection.process_data_events` and 1599 `BlockingChannel.start_consuming`. 1600 1601 For more information about Basic.Consume, see: 1602 http://www.rabbitmq.com/amqp-0-9-1-reference.html#basic.consume 1603 1604 :param str queue: The queue from which to consume 1605 :param callable on_message_callback: Required function for dispatching messages 1606 to user, having the signature: 1607 on_message_callback(channel, method, properties, body) 1608 channel: BlockingChannel 1609 method: spec.Basic.Deliver 1610 properties: spec.BasicProperties 1611 body: bytes 1612 :param bool auto_ack: if set to True, automatic acknowledgement mode will be used 1613 (see http://www.rabbitmq.com/confirms.html). This corresponds 1614 with the 'no_ack' parameter in the basic.consume AMQP 0.9.1 1615 method 1616 :param bool exclusive: Don't allow other consumers on the queue 1617 :param str consumer_tag: You may specify your own consumer tag; if left 1618 empty, a consumer tag will be generated automatically 1619 :param dict arguments: Custom key/value pair arguments for the consumer 1620 :returns: consumer tag 1621 :rtype: str 1622 :raises pika.exceptions.DuplicateConsumerTag: if consumer with given 1623 consumer_tag is already present. 1624 1625 """ 1626 validators.require_string(queue, 'queue') 1627 validators.require_callback(on_message_callback, 'on_message_callback') 1628 return self._basic_consume_impl( 1629 queue=queue, 1630 on_message_callback=on_message_callback, 1631 auto_ack=auto_ack, 1632 exclusive=exclusive, 1633 consumer_tag=consumer_tag, 1634 arguments=arguments) 1635 1636 def _basic_consume_impl(self, 1637 queue, 1638 auto_ack, 1639 exclusive, 1640 consumer_tag, 1641 arguments=None, 1642 on_message_callback=None, 1643 alternate_event_sink=None): 1644 """The low-level implementation used by `basic_consume` and `consume`. 1645 See `basic_consume` docstring for more info. 1646 1647 NOTE: exactly one of on_message_callback/alternate_event_sink musts be 1648 non-None. 1649 1650 This method has one additional parameter alternate_event_sink over the 1651 args described in `basic_consume`. 1652 1653 :param callable alternate_event_sink: if specified, _ConsumerDeliveryEvt 1654 and _ConsumerCancellationEvt objects will be diverted to this 1655 callback instead of being deposited in the channel's 1656 `_pending_events` container. Signature: 1657 alternate_event_sink(evt) 1658 1659 :raises pika.exceptions.DuplicateConsumerTag: if consumer with given 1660 consumer_tag is already present. 1661 1662 """ 1663 if (on_message_callback is None) == (alternate_event_sink is None): 1664 raise ValueError( 1665 ('exactly one of on_message_callback/alternate_event_sink must ' 1666 'be non-None', on_message_callback, alternate_event_sink)) 1667 1668 if not consumer_tag: 1669 # Need a consumer tag to register consumer info before sending 1670 # request to broker, because I/O might dispatch incoming messages 1671 # immediately following Basic.Consume-ok before _flush_output 1672 # returns 1673 consumer_tag = self._impl._generate_consumer_tag() 1674 1675 if consumer_tag in self._consumer_infos: 1676 raise exceptions.DuplicateConsumerTag(consumer_tag) 1677 1678 # Create new consumer 1679 self._consumer_infos[consumer_tag] = _ConsumerInfo( 1680 consumer_tag, 1681 auto_ack=auto_ack, 1682 on_message_callback=on_message_callback, 1683 alternate_event_sink=alternate_event_sink) 1684 1685 try: 1686 with self._basic_consume_ok_result as ok_result: 1687 tag = self._impl.basic_consume( 1688 on_message_callback=self._on_consumer_message_delivery, 1689 queue=queue, 1690 auto_ack=auto_ack, 1691 exclusive=exclusive, 1692 consumer_tag=consumer_tag, 1693 arguments=arguments) 1694 1695 assert tag == consumer_tag, (tag, consumer_tag) 1696 1697 self._flush_output(ok_result.is_ready) 1698 except Exception: 1699 # If channel was closed, self._consumer_infos will be empty 1700 if consumer_tag in self._consumer_infos: 1701 del self._consumer_infos[consumer_tag] 1702 # Schedule termination of connection.process_data_events using a 1703 # negative channel number 1704 self.connection._request_channel_dispatch(-self.channel_number) 1705 raise 1706 1707 # NOTE: Consumer could get cancelled by broker immediately after opening 1708 # (e.g., queue getting deleted externally) 1709 if self._consumer_infos[consumer_tag].setting_up: 1710 self._consumer_infos[consumer_tag].state = _ConsumerInfo.ACTIVE 1711 1712 return consumer_tag 1713 1714 def basic_cancel(self, consumer_tag): 1715 """This method cancels a consumer. This does not affect already 1716 delivered messages, but it does mean the server will not send any more 1717 messages for that consumer. The client may receive an arbitrary number 1718 of messages in between sending the cancel method and receiving the 1719 cancel-ok reply. 1720 1721 NOTE: When cancelling an auto_ack=False consumer, this implementation 1722 automatically Nacks and suppresses any incoming messages that have not 1723 yet been dispatched to the consumer's callback. However, when cancelling 1724 a auto_ack=True consumer, this method will return any pending messages 1725 that arrived before broker confirmed the cancellation. 1726 1727 :param str consumer_tag: Identifier for the consumer; the result of 1728 passing a consumer_tag that was created on another channel is 1729 undefined (bad things will happen) 1730 :returns: (NEW IN pika 0.10.0) empty sequence for a auto_ack=False 1731 consumer; for a auto_ack=True consumer, returns a (possibly empty) 1732 sequence of pending messages that arrived before broker confirmed 1733 the cancellation (this is done instead of via consumer's callback in 1734 order to prevent reentrancy/recursion. Each message is four-tuple: 1735 (channel, method, properties, body) 1736 channel: BlockingChannel 1737 method: spec.Basic.Deliver 1738 properties: spec.BasicProperties 1739 body: bytes 1740 :rtype: list 1741 """ 1742 try: 1743 consumer_info = self._consumer_infos[consumer_tag] 1744 except KeyError: 1745 LOGGER.warning( 1746 "User is attempting to cancel an unknown consumer=%s; " 1747 "already cancelled by user or broker?", consumer_tag) 1748 return [] 1749 1750 try: 1751 # Assertion failure here is most likely due to reentrance 1752 assert consumer_info.active or consumer_info.cancelled_by_broker, ( 1753 consumer_info.state) 1754 1755 # Assertion failure here signals disconnect between consumer state 1756 # in BlockingChannel and Channel 1757 assert (consumer_info.cancelled_by_broker or 1758 consumer_tag in self._impl._consumers), consumer_tag 1759 1760 auto_ack = consumer_info.auto_ack 1761 1762 consumer_info.state = _ConsumerInfo.TEARING_DOWN 1763 1764 with _CallbackResult() as cancel_ok_result: 1765 # Nack pending messages for auto_ack=False consumer 1766 if not auto_ack: 1767 pending_messages = self._remove_pending_deliveries( 1768 consumer_tag) 1769 if pending_messages: 1770 # NOTE: we use impl's basic_reject to avoid the 1771 # possibility of redelivery before basic_cancel takes 1772 # control of nacking. 1773 # NOTE: we can't use basic_nack with the multiple option 1774 # to avoid nacking messages already held by our client. 1775 for message in pending_messages: 1776 self._impl.basic_reject( 1777 message.method.delivery_tag, requeue=True) 1778 1779 # Cancel the consumer; impl takes care of rejecting any 1780 # additional deliveries that arrive for a auto_ack=False 1781 # consumer 1782 self._impl.basic_cancel( 1783 consumer_tag=consumer_tag, 1784 callback=cancel_ok_result.signal_once) 1785 1786 # Flush output and wait for Basic.Cancel-ok or 1787 # broker-initiated Basic.Cancel 1788 self._flush_output( 1789 cancel_ok_result.is_ready, 1790 lambda: consumer_tag not in self._impl._consumers) 1791 1792 if auto_ack: 1793 # Return pending messages for auto_ack=True consumer 1794 return [(evt.method, evt.properties, evt.body) 1795 for evt in self._remove_pending_deliveries(consumer_tag) 1796 ] 1797 else: 1798 # impl takes care of rejecting any incoming deliveries during 1799 # cancellation 1800 messages = self._remove_pending_deliveries(consumer_tag) 1801 assert not messages, messages 1802 1803 return [] 1804 finally: 1805 # NOTE: The entry could be purged if channel or connection closes 1806 if consumer_tag in self._consumer_infos: 1807 del self._consumer_infos[consumer_tag] 1808 # Schedule termination of connection.process_data_events using a 1809 # negative channel number 1810 self.connection._request_channel_dispatch(-self.channel_number) 1811 1812 def _remove_pending_deliveries(self, consumer_tag): 1813 """Extract _ConsumerDeliveryEvt objects destined for the given consumer 1814 from pending events, discarding the _ConsumerCancellationEvt, if any 1815 1816 :param str consumer_tag: 1817 1818 :returns: a (possibly empty) sequence of _ConsumerDeliveryEvt destined 1819 for the given consumer tag 1820 :rtype: list 1821 """ 1822 remaining_events = deque() 1823 unprocessed_messages = [] 1824 while self._pending_events: 1825 evt = self._pending_events.popleft() 1826 if type(evt) is _ConsumerDeliveryEvt: # pylint: disable=C0123 1827 if evt.method.consumer_tag == consumer_tag: 1828 unprocessed_messages.append(evt) 1829 continue 1830 if type(evt) is _ConsumerCancellationEvt: # pylint: disable=C0123 1831 if evt.method_frame.method.consumer_tag == consumer_tag: 1832 # A broker-initiated Basic.Cancel must have arrived 1833 # before our cancel request completed 1834 continue 1835 1836 remaining_events.append(evt) 1837 1838 self._pending_events = remaining_events 1839 1840 return unprocessed_messages 1841 1842 def start_consuming(self): 1843 """Processes I/O events and dispatches timers and `basic_consume` 1844 callbacks until all consumers are cancelled. 1845 1846 NOTE: this blocking function may not be called from the scope of a 1847 pika callback, because dispatching `basic_consume` callbacks from this 1848 context would constitute recursion. 1849 1850 :raises pika.exceptions.ReentrancyError: if called from the scope of a 1851 `BlockingConnection` or `BlockingChannel` callback 1852 :raises ChannelClosed: when this channel is closed by broker. 1853 """ 1854 # Check if called from the scope of an event dispatch callback 1855 with self.connection._acquire_event_dispatch() as dispatch_allowed: 1856 if not dispatch_allowed: 1857 raise exceptions.ReentrancyError( 1858 'start_consuming may not be called from the scope of ' 1859 'another BlockingConnection or BlockingChannel callback') 1860 1861 self._impl._raise_if_not_open() 1862 1863 # Process events as long as consumers exist on this channel 1864 while self._consumer_infos: 1865 # This will raise ChannelClosed if channel is closed by broker 1866 self._process_data_events(time_limit=None) 1867 1868 def stop_consuming(self, consumer_tag=None): 1869 """ Cancels all consumers, signalling the `start_consuming` loop to 1870 exit. 1871 1872 NOTE: pending non-ackable messages will be lost; pending ackable 1873 messages will be rejected. 1874 1875 """ 1876 if consumer_tag: 1877 self.basic_cancel(consumer_tag) 1878 else: 1879 self._cancel_all_consumers() 1880 1881 def consume(self, 1882 queue, 1883 auto_ack=False, 1884 exclusive=False, 1885 arguments=None, 1886 inactivity_timeout=None): 1887 """Blocking consumption of a queue instead of via a callback. This 1888 method is a generator that yields each message as a tuple of method, 1889 properties, and body. The active generator iterator terminates when the 1890 consumer is cancelled by client via `BlockingChannel.cancel()` or by 1891 broker. 1892 1893 Example: 1894 1895 for method, properties, body in channel.consume('queue'): 1896 print body 1897 channel.basic_ack(method.delivery_tag) 1898 1899 You should call `BlockingChannel.cancel()` when you escape out of the 1900 generator loop. 1901 1902 If you don't cancel this consumer, then next call on the same channel 1903 to `consume()` with the exact same (queue, auto_ack, exclusive) parameters 1904 will resume the existing consumer generator; however, calling with 1905 different parameters will result in an exception. 1906 1907 :param str queue: The queue name to consume 1908 :param bool auto_ack: Tell the broker to not expect a ack/nack response 1909 :param bool exclusive: Don't allow other consumers on the queue 1910 :param dict arguments: Custom key/value pair arguments for the consumer 1911 :param float inactivity_timeout: if a number is given (in 1912 seconds), will cause the method to yield (None, None, None) after the 1913 given period of inactivity; this permits for pseudo-regular maintenance 1914 activities to be carried out by the user while waiting for messages 1915 to arrive. If None is given (default), then the method blocks until 1916 the next event arrives. NOTE that timing granularity is limited by 1917 the timer resolution of the underlying implementation. 1918 NEW in pika 0.10.0. 1919 1920 :yields: tuple(spec.Basic.Deliver, spec.BasicProperties, str or unicode) 1921 1922 :raises ValueError: if consumer-creation parameters don't match those 1923 of the existing queue consumer generator, if any. 1924 NEW in pika 0.10.0 1925 :raises ChannelClosed: when this channel is closed by broker. 1926 1927 """ 1928 self._impl._raise_if_not_open() 1929 1930 params = (queue, auto_ack, exclusive) 1931 1932 if self._queue_consumer_generator is not None: 1933 if params != self._queue_consumer_generator.params: 1934 raise ValueError( 1935 'Consume with different params not allowed on existing ' 1936 'queue consumer generator; previous params: %r; ' 1937 'new params: %r' % (self._queue_consumer_generator.params, 1938 (queue, auto_ack, exclusive))) 1939 else: 1940 LOGGER.debug('Creating new queue consumer generator; params: %r', 1941 params) 1942 # Need a consumer tag to register consumer info before sending 1943 # request to broker, because I/O might pick up incoming messages 1944 # in addition to Basic.Consume-ok 1945 consumer_tag = self._impl._generate_consumer_tag() 1946 1947 self._queue_consumer_generator = _QueueConsumerGeneratorInfo( 1948 params, consumer_tag) 1949 1950 try: 1951 self._basic_consume_impl( 1952 queue=queue, 1953 auto_ack=auto_ack, 1954 exclusive=exclusive, 1955 consumer_tag=consumer_tag, 1956 arguments=arguments, 1957 alternate_event_sink=self._on_consumer_generator_event) 1958 except Exception: 1959 self._queue_consumer_generator = None 1960 raise 1961 1962 LOGGER.info('Created new queue consumer generator %r', 1963 self._queue_consumer_generator) 1964 1965 while self._queue_consumer_generator is not None: 1966 # Process pending events 1967 if self._queue_consumer_generator.pending_events: 1968 evt = self._queue_consumer_generator.pending_events.popleft() 1969 if type(evt) is _ConsumerCancellationEvt: # pylint: disable=C0123 1970 # Consumer was cancelled by broker 1971 self._queue_consumer_generator = None 1972 break 1973 else: 1974 yield (evt.method, evt.properties, evt.body) 1975 continue 1976 1977 if inactivity_timeout is None: 1978 # Wait indefinitely for a message to arrive, while processing 1979 # I/O events and triggering ChannelClosed exception when the 1980 # channel fails 1981 self._process_data_events(time_limit=None) 1982 continue 1983 1984 # Wait with inactivity timeout 1985 wait_start_time = compat.time_now() 1986 wait_deadline = wait_start_time + inactivity_timeout 1987 delta = inactivity_timeout 1988 1989 while (self._queue_consumer_generator is not None and 1990 not self._queue_consumer_generator.pending_events): 1991 1992 self._process_data_events(time_limit=delta) 1993 1994 if not self._queue_consumer_generator: 1995 # Consumer was cancelled by client 1996 break 1997 1998 if self._queue_consumer_generator.pending_events: 1999 # Got message(s) 2000 break 2001 2002 delta = wait_deadline - compat.time_now() 2003 if delta <= 0.0: 2004 # Signal inactivity timeout 2005 yield (None, None, None) 2006 break 2007 2008 def _process_data_events(self, time_limit): 2009 """Wrapper for `BlockingConnection.process_data_events()` with common 2010 channel-specific logic that raises ChannelClosed if broker closed this 2011 channel. 2012 2013 NOTE: We need to raise an exception in the context of user's call into 2014 our API to protect the integrity of the underlying implementation. 2015 BlockingConnection's underlying asynchronous connection adapter 2016 (SelectConnection) uses callbacks to communicate with us. If 2017 BlockingConnection leaks exceptions back into the I/O loop or the 2018 asynchronous connection adapter, we interrupt their normal workflow and 2019 introduce a high likelihood of state inconsistency. 2020 2021 See `BlockingConnection.process_data_events()` for documentation of args 2022 and behavior. 2023 2024 :param float time_limit: 2025 2026 """ 2027 self.connection.process_data_events(time_limit=time_limit) 2028 if self.is_closed and isinstance(self._closing_reason, 2029 exceptions.ChannelClosedByBroker): 2030 LOGGER.debug('Channel close by broker detected, raising %r; %r', 2031 self._closing_reason, self) 2032 raise self._closing_reason # pylint: disable=E0702 2033 2034 def get_waiting_message_count(self): 2035 """Returns the number of messages that may be retrieved from the current 2036 queue consumer generator via `BlockingChannel.consume` without blocking. 2037 NEW in pika 0.10.0 2038 2039 :returns: The number of waiting messages 2040 :rtype: int 2041 """ 2042 if self._queue_consumer_generator is not None: 2043 pending_events = self._queue_consumer_generator.pending_events 2044 count = len(pending_events) 2045 if count and type(pending_events[-1]) is _ConsumerCancellationEvt: # pylint: disable=C0123 2046 count -= 1 2047 else: 2048 count = 0 2049 2050 return count 2051 2052 def cancel(self): 2053 """Cancel the queue consumer created by `BlockingChannel.consume`, 2054 rejecting all pending ackable messages. 2055 2056 NOTE: If you're looking to cancel a consumer issued with 2057 BlockingChannel.basic_consume then you should call 2058 BlockingChannel.basic_cancel. 2059 2060 :returns: The number of messages requeued by Basic.Nack. 2061 NEW in 0.10.0: returns 0 2062 :rtype: int 2063 2064 """ 2065 if self._queue_consumer_generator is None: 2066 LOGGER.warning('cancel: queue consumer generator is inactive ' 2067 '(already cancelled by client or broker?)') 2068 return 0 2069 2070 try: 2071 _, auto_ack, _ = self._queue_consumer_generator.params 2072 if not auto_ack: 2073 # Reject messages held by queue consumer generator; NOTE: we 2074 # can't use basic_nack with the multiple option to avoid nacking 2075 # messages already held by our client. 2076 pending_events = self._queue_consumer_generator.pending_events 2077 # NOTE `get_waiting_message_count` adjusts for `Basic.Cancel` 2078 # from the server at the end (if any) 2079 for _ in compat.xrange(self.get_waiting_message_count()): 2080 evt = pending_events.popleft() 2081 self._impl.basic_reject( 2082 evt.method.delivery_tag, requeue=True) 2083 2084 self.basic_cancel(self._queue_consumer_generator.consumer_tag) 2085 finally: 2086 self._queue_consumer_generator = None 2087 2088 # Return 0 for compatibility with legacy implementation; the number of 2089 # nacked messages is not meaningful since only messages consumed with 2090 # auto_ack=False may be nacked, and those arriving after calling 2091 # basic_cancel will be rejected automatically by impl channel, so we'll 2092 # never know how many of those were nacked. 2093 return 0 2094 2095 def basic_ack(self, delivery_tag=0, multiple=False): 2096 """Acknowledge one or more messages. When sent by the client, this 2097 method acknowledges one or more messages delivered via the Deliver or 2098 Get-Ok methods. When sent by server, this method acknowledges one or 2099 more messages published with the Publish method on a channel in 2100 confirm mode. The acknowledgement can be for a single message or a 2101 set of messages up to and including a specific message. 2102 2103 :param int delivery-tag: The server-assigned delivery tag 2104 :param bool multiple: If set to True, the delivery tag is treated as 2105 "up to and including", so that multiple messages 2106 can be acknowledged with a single method. If set 2107 to False, the delivery tag refers to a single 2108 message. If the multiple field is 1, and the 2109 delivery tag is zero, this indicates 2110 acknowledgement of all outstanding messages. 2111 """ 2112 self._impl.basic_ack(delivery_tag=delivery_tag, multiple=multiple) 2113 self._flush_output() 2114 2115 def basic_nack(self, delivery_tag=None, multiple=False, requeue=True): 2116 """This method allows a client to reject one or more incoming messages. 2117 It can be used to interrupt and cancel large incoming messages, or 2118 return untreatable messages to their original queue. 2119 2120 :param int delivery-tag: The server-assigned delivery tag 2121 :param bool multiple: If set to True, the delivery tag is treated as 2122 "up to and including", so that multiple messages 2123 can be acknowledged with a single method. If set 2124 to False, the delivery tag refers to a single 2125 message. If the multiple field is 1, and the 2126 delivery tag is zero, this indicates 2127 acknowledgement of all outstanding messages. 2128 :param bool requeue: If requeue is true, the server will attempt to 2129 requeue the message. If requeue is false or the 2130 requeue attempt fails the messages are discarded or 2131 dead-lettered. 2132 2133 """ 2134 self._impl.basic_nack( 2135 delivery_tag=delivery_tag, multiple=multiple, requeue=requeue) 2136 self._flush_output() 2137 2138 def basic_get(self, queue, auto_ack=False): 2139 """Get a single message from the AMQP broker. Returns a sequence with 2140 the method frame, message properties, and body. 2141 2142 :param str queue: Name of queue from which to get a message 2143 :param bool auto_ack: Tell the broker to not expect a reply 2144 :returns: a three-tuple; (None, None, None) if the queue was empty; 2145 otherwise (method, properties, body); NOTE: body may be None 2146 :rtype: (spec.Basic.GetOk|None, spec.BasicProperties|None, str|None) 2147 """ 2148 assert not self._basic_getempty_result 2149 2150 validators.require_string(queue, 'queue') 2151 2152 # NOTE: nested with for python 2.6 compatibility 2153 with _CallbackResult(self._RxMessageArgs) as get_ok_result: 2154 with self._basic_getempty_result: 2155 self._impl.basic_get( 2156 queue=queue, 2157 auto_ack=auto_ack, 2158 callback=get_ok_result.set_value_once) 2159 self._flush_output(get_ok_result.is_ready, 2160 self._basic_getempty_result.is_ready) 2161 if get_ok_result: 2162 evt = get_ok_result.value 2163 return evt.method, evt.properties, evt.body 2164 else: 2165 assert self._basic_getempty_result, ( 2166 "wait completed without GetOk and GetEmpty") 2167 return None, None, None 2168 2169 def basic_publish(self, 2170 exchange, 2171 routing_key, 2172 body, 2173 properties=None, 2174 mandatory=False): 2175 """Publish to the channel with the given exchange, routing key, and 2176 body. 2177 2178 For more information on basic_publish and what the parameters do, see: 2179 2180 http://www.rabbitmq.com/amqp-0-9-1-reference.html#basic.publish 2181 2182 NOTE: mandatory may be enabled even without delivery 2183 confirmation, but in the absence of delivery confirmation the 2184 synchronous implementation has no way to know how long to wait for 2185 the Basic.Return. 2186 2187 :param str exchange: The exchange to publish to 2188 :param str routing_key: The routing key to bind on 2189 :param bytes body: The message body; empty string if no body 2190 :param pika.spec.BasicProperties properties: message properties 2191 :param bool mandatory: The mandatory flag 2192 2193 :raises UnroutableError: raised when a message published in 2194 publisher-acknowledgments mode (see 2195 `BlockingChannel.confirm_delivery`) is returned via `Basic.Return` 2196 followed by `Basic.Ack`. 2197 :raises NackError: raised when a message published in 2198 publisher-acknowledgements mode is Nack'ed by the broker. See 2199 `BlockingChannel.confirm_delivery`. 2200 2201 """ 2202 if self._delivery_confirmation: 2203 # In publisher-acknowledgments mode 2204 with self._message_confirmation_result: 2205 self._impl.basic_publish( 2206 exchange=exchange, 2207 routing_key=routing_key, 2208 body=body, 2209 properties=properties, 2210 mandatory=mandatory) 2211 2212 self._flush_output(self._message_confirmation_result.is_ready) 2213 conf_method = ( 2214 self._message_confirmation_result.value.method_frame.method) 2215 2216 if isinstance(conf_method, pika.spec.Basic.Nack): 2217 # Broker was unable to process the message due to internal 2218 # error 2219 LOGGER.warning( 2220 "Message was Nack'ed by broker: nack=%r; channel=%s; " 2221 "exchange=%s; routing_key=%s; mandatory=%r; ", 2222 conf_method, self.channel_number, exchange, routing_key, 2223 mandatory) 2224 if self._puback_return is not None: 2225 returned_messages = [self._puback_return] 2226 self._puback_return = None 2227 else: 2228 returned_messages = [] 2229 raise exceptions.NackError(returned_messages) 2230 2231 else: 2232 assert isinstance(conf_method, 2233 pika.spec.Basic.Ack), (conf_method) 2234 2235 if self._puback_return is not None: 2236 # Unroutable message was returned 2237 messages = [self._puback_return] 2238 self._puback_return = None 2239 raise exceptions.UnroutableError(messages) 2240 else: 2241 # In non-publisher-acknowledgments mode 2242 self._impl.basic_publish( 2243 exchange=exchange, 2244 routing_key=routing_key, 2245 body=body, 2246 properties=properties, 2247 mandatory=mandatory) 2248 self._flush_output() 2249 2250 def basic_qos(self, prefetch_size=0, prefetch_count=0, global_qos=False): 2251 """Specify quality of service. This method requests a specific quality 2252 of service. The QoS can be specified for the current channel or for all 2253 channels on the connection. The client can request that messages be sent 2254 in advance so that when the client finishes processing a message, the 2255 following message is already held locally, rather than needing to be 2256 sent down the channel. Prefetching gives a performance improvement. 2257 2258 :param int prefetch_size: This field specifies the prefetch window 2259 size. The server will send a message in 2260 advance if it is equal to or smaller in size 2261 than the available prefetch size (and also 2262 falls into other prefetch limits). May be set 2263 to zero, meaning "no specific limit", 2264 although other prefetch limits may still 2265 apply. The prefetch-size is ignored if the 2266 no-ack option is set in the consumer. 2267 :param int prefetch_count: Specifies a prefetch window in terms of whole 2268 messages. This field may be used in 2269 combination with the prefetch-size field; a 2270 message will only be sent in advance if both 2271 prefetch windows (and those at the channel 2272 and connection level) allow it. The 2273 prefetch-count is ignored if the no-ack 2274 option is set in the consumer. 2275 :param bool global_qos: Should the QoS apply to all channels on the 2276 connection. 2277 2278 """ 2279 with _CallbackResult() as qos_ok_result: 2280 self._impl.basic_qos( 2281 callback=qos_ok_result.signal_once, 2282 prefetch_size=prefetch_size, 2283 prefetch_count=prefetch_count, 2284 global_qos=global_qos) 2285 self._flush_output(qos_ok_result.is_ready) 2286 2287 def basic_recover(self, requeue=False): 2288 """This method asks the server to redeliver all unacknowledged messages 2289 on a specified channel. Zero or more messages may be redelivered. This 2290 method replaces the asynchronous Recover. 2291 2292 :param bool requeue: If False, the message will be redelivered to the 2293 original recipient. If True, the server will 2294 attempt to requeue the message, potentially then 2295 delivering it to an alternative subscriber. 2296 2297 """ 2298 with _CallbackResult() as recover_ok_result: 2299 self._impl.basic_recover( 2300 requeue=requeue, callback=recover_ok_result.signal_once) 2301 self._flush_output(recover_ok_result.is_ready) 2302 2303 def basic_reject(self, delivery_tag=None, requeue=True): 2304 """Reject an incoming message. This method allows a client to reject a 2305 message. It can be used to interrupt and cancel large incoming messages, 2306 or return untreatable messages to their original queue. 2307 2308 :param int delivery-tag: The server-assigned delivery tag 2309 :param bool requeue: If requeue is true, the server will attempt to 2310 requeue the message. If requeue is false or the 2311 requeue attempt fails the messages are discarded or 2312 dead-lettered. 2313 2314 """ 2315 self._impl.basic_reject(delivery_tag=delivery_tag, requeue=requeue) 2316 self._flush_output() 2317 2318 def confirm_delivery(self): 2319 """Turn on RabbitMQ-proprietary Confirm mode in the channel. 2320 2321 For more information see: 2322 https://www.rabbitmq.com/confirms.html 2323 """ 2324 if self._delivery_confirmation: 2325 LOGGER.error( 2326 'confirm_delivery: confirmation was already enabled ' 2327 'on channel=%s', self.channel_number) 2328 return 2329 2330 with _CallbackResult() as select_ok_result: 2331 self._impl.confirm_delivery( 2332 ack_nack_callback=self._message_confirmation_result. 2333 set_value_once, 2334 callback=select_ok_result.signal_once) 2335 2336 self._flush_output(select_ok_result.is_ready) 2337 2338 self._delivery_confirmation = True 2339 2340 # Unroutable messages returned after this point will be in the context 2341 # of publisher acknowledgments 2342 self._impl.add_on_return_callback(self._on_puback_message_returned) 2343 2344 def exchange_declare(self, 2345 exchange, 2346 exchange_type='direct', 2347 passive=False, 2348 durable=False, 2349 auto_delete=False, 2350 internal=False, 2351 arguments=None): 2352 """This method creates an exchange if it does not already exist, and if 2353 the exchange exists, verifies that it is of the correct and expected 2354 class. 2355 2356 If passive set, the server will reply with Declare-Ok if the exchange 2357 already exists with the same name, and raise an error if not and if the 2358 exchange does not already exist, the server MUST raise a channel 2359 exception with reply code 404 (not found). 2360 2361 :param str exchange: The exchange name consists of a non-empty sequence of 2362 these characters: letters, digits, hyphen, underscore, 2363 period, or colon. 2364 :param str exchange_type: The exchange type to use 2365 :param bool passive: Perform a declare or just check to see if it exists 2366 :param bool durable: Survive a reboot of RabbitMQ 2367 :param bool auto_delete: Remove when no more queues are bound to it 2368 :param bool internal: Can only be published to by other exchanges 2369 :param dict arguments: Custom key/value pair arguments for the exchange 2370 :returns: Method frame from the Exchange.Declare-ok response 2371 :rtype: `pika.frame.Method` having `method` attribute of type 2372 `spec.Exchange.DeclareOk` 2373 2374 """ 2375 validators.require_string(exchange, 'exchange') 2376 with _CallbackResult( 2377 self._MethodFrameCallbackResultArgs) as declare_ok_result: 2378 self._impl.exchange_declare( 2379 exchange=exchange, 2380 exchange_type=exchange_type, 2381 passive=passive, 2382 durable=durable, 2383 auto_delete=auto_delete, 2384 internal=internal, 2385 arguments=arguments, 2386 callback=declare_ok_result.set_value_once) 2387 2388 self._flush_output(declare_ok_result.is_ready) 2389 return declare_ok_result.value.method_frame 2390 2391 def exchange_delete(self, exchange=None, if_unused=False): 2392 """Delete the exchange. 2393 2394 :param str exchange: The exchange name 2395 :param bool if_unused: only delete if the exchange is unused 2396 :returns: Method frame from the Exchange.Delete-ok response 2397 :rtype: `pika.frame.Method` having `method` attribute of type 2398 `spec.Exchange.DeleteOk` 2399 2400 """ 2401 with _CallbackResult( 2402 self._MethodFrameCallbackResultArgs) as delete_ok_result: 2403 self._impl.exchange_delete( 2404 exchange=exchange, 2405 if_unused=if_unused, 2406 callback=delete_ok_result.set_value_once) 2407 2408 self._flush_output(delete_ok_result.is_ready) 2409 return delete_ok_result.value.method_frame 2410 2411 def exchange_bind(self, destination, source, routing_key='', 2412 arguments=None): 2413 """Bind an exchange to another exchange. 2414 2415 :param str destination: The destination exchange to bind 2416 :param str source: The source exchange to bind to 2417 :param str routing_key: The routing key to bind on 2418 :param dict arguments: Custom key/value pair arguments for the binding 2419 :returns: Method frame from the Exchange.Bind-ok response 2420 :rtype: `pika.frame.Method` having `method` attribute of type 2421 `spec.Exchange.BindOk` 2422 2423 """ 2424 validators.require_string(destination, 'destination') 2425 validators.require_string(source, 'source') 2426 with _CallbackResult(self._MethodFrameCallbackResultArgs) as \ 2427 bind_ok_result: 2428 self._impl.exchange_bind( 2429 destination=destination, 2430 source=source, 2431 routing_key=routing_key, 2432 arguments=arguments, 2433 callback=bind_ok_result.set_value_once) 2434 2435 self._flush_output(bind_ok_result.is_ready) 2436 return bind_ok_result.value.method_frame 2437 2438 def exchange_unbind(self, 2439 destination=None, 2440 source=None, 2441 routing_key='', 2442 arguments=None): 2443 """Unbind an exchange from another exchange. 2444 2445 :param str destination: The destination exchange to unbind 2446 :param str source: The source exchange to unbind from 2447 :param str routing_key: The routing key to unbind 2448 :param dict arguments: Custom key/value pair arguments for the binding 2449 :returns: Method frame from the Exchange.Unbind-ok response 2450 :rtype: `pika.frame.Method` having `method` attribute of type 2451 `spec.Exchange.UnbindOk` 2452 2453 """ 2454 with _CallbackResult( 2455 self._MethodFrameCallbackResultArgs) as unbind_ok_result: 2456 self._impl.exchange_unbind( 2457 destination=destination, 2458 source=source, 2459 routing_key=routing_key, 2460 arguments=arguments, 2461 callback=unbind_ok_result.set_value_once) 2462 2463 self._flush_output(unbind_ok_result.is_ready) 2464 return unbind_ok_result.value.method_frame 2465 2466 def queue_declare(self, 2467 queue, 2468 passive=False, 2469 durable=False, 2470 exclusive=False, 2471 auto_delete=False, 2472 arguments=None): 2473 """Declare queue, create if needed. This method creates or checks a 2474 queue. When creating a new queue the client can specify various 2475 properties that control the durability of the queue and its contents, 2476 and the level of sharing for the queue. 2477 2478 Use an empty string as the queue name for the broker to auto-generate 2479 one. Retrieve this auto-generated queue name from the returned 2480 `spec.Queue.DeclareOk` method frame. 2481 2482 :param str queue: The queue name; if empty string, the broker will 2483 create a unique queue name 2484 :param bool passive: Only check to see if the queue exists and raise 2485 `ChannelClosed` if it doesn't 2486 :param bool durable: Survive reboots of the broker 2487 :param bool exclusive: Only allow access by the current connection 2488 :param bool auto_delete: Delete after consumer cancels or disconnects 2489 :param dict arguments: Custom key/value arguments for the queue 2490 :returns: Method frame from the Queue.Declare-ok response 2491 :rtype: `pika.frame.Method` having `method` attribute of type 2492 `spec.Queue.DeclareOk` 2493 2494 """ 2495 validators.require_string(queue, 'queue') 2496 with _CallbackResult(self._MethodFrameCallbackResultArgs) as \ 2497 declare_ok_result: 2498 self._impl.queue_declare( 2499 queue=queue, 2500 passive=passive, 2501 durable=durable, 2502 exclusive=exclusive, 2503 auto_delete=auto_delete, 2504 arguments=arguments, 2505 callback=declare_ok_result.set_value_once) 2506 2507 self._flush_output(declare_ok_result.is_ready) 2508 return declare_ok_result.value.method_frame 2509 2510 def queue_delete(self, queue, if_unused=False, if_empty=False): 2511 """Delete a queue from the broker. 2512 2513 :param str queue: The queue to delete 2514 :param bool if_unused: only delete if it's unused 2515 :param bool if_empty: only delete if the queue is empty 2516 :returns: Method frame from the Queue.Delete-ok response 2517 :rtype: `pika.frame.Method` having `method` attribute of type 2518 `spec.Queue.DeleteOk` 2519 2520 """ 2521 with _CallbackResult(self._MethodFrameCallbackResultArgs) as \ 2522 delete_ok_result: 2523 self._impl.queue_delete( 2524 queue=queue, 2525 if_unused=if_unused, 2526 if_empty=if_empty, 2527 callback=delete_ok_result.set_value_once) 2528 2529 self._flush_output(delete_ok_result.is_ready) 2530 return delete_ok_result.value.method_frame 2531 2532 def queue_purge(self, queue): 2533 """Purge all of the messages from the specified queue 2534 2535 :param str queue: The queue to purge 2536 :returns: Method frame from the Queue.Purge-ok response 2537 :rtype: `pika.frame.Method` having `method` attribute of type 2538 `spec.Queue.PurgeOk` 2539 2540 """ 2541 with _CallbackResult(self._MethodFrameCallbackResultArgs) as \ 2542 purge_ok_result: 2543 self._impl.queue_purge( 2544 queue=queue, callback=purge_ok_result.set_value_once) 2545 self._flush_output(purge_ok_result.is_ready) 2546 return purge_ok_result.value.method_frame 2547 2548 def queue_bind(self, queue, exchange, routing_key=None, arguments=None): 2549 """Bind the queue to the specified exchange 2550 2551 :param str queue: The queue to bind to the exchange 2552 :param str exchange: The source exchange to bind to 2553 :param str routing_key: The routing key to bind on 2554 :param dict arguments: Custom key/value pair arguments for the binding 2555 2556 :returns: Method frame from the Queue.Bind-ok response 2557 :rtype: `pika.frame.Method` having `method` attribute of type 2558 `spec.Queue.BindOk` 2559 2560 """ 2561 validators.require_string(queue, 'queue') 2562 validators.require_string(exchange, 'exchange') 2563 with _CallbackResult( 2564 self._MethodFrameCallbackResultArgs) as bind_ok_result: 2565 self._impl.queue_bind( 2566 queue=queue, 2567 exchange=exchange, 2568 routing_key=routing_key, 2569 arguments=arguments, 2570 callback=bind_ok_result.set_value_once) 2571 self._flush_output(bind_ok_result.is_ready) 2572 return bind_ok_result.value.method_frame 2573 2574 def queue_unbind(self, 2575 queue, 2576 exchange=None, 2577 routing_key=None, 2578 arguments=None): 2579 """Unbind a queue from an exchange. 2580 2581 :param str queue: The queue to unbind from the exchange 2582 :param str exchange: The source exchange to bind from 2583 :param str routing_key: The routing key to unbind 2584 :param dict arguments: Custom key/value pair arguments for the binding 2585 2586 :returns: Method frame from the Queue.Unbind-ok response 2587 :rtype: `pika.frame.Method` having `method` attribute of type 2588 `spec.Queue.UnbindOk` 2589 2590 """ 2591 with _CallbackResult(self._MethodFrameCallbackResultArgs) as \ 2592 unbind_ok_result: 2593 self._impl.queue_unbind( 2594 queue=queue, 2595 exchange=exchange, 2596 routing_key=routing_key, 2597 arguments=arguments, 2598 callback=unbind_ok_result.set_value_once) 2599 self._flush_output(unbind_ok_result.is_ready) 2600 return unbind_ok_result.value.method_frame 2601 2602 def tx_select(self): 2603 """Select standard transaction mode. This method sets the channel to use 2604 standard transactions. The client must use this method at least once on 2605 a channel before using the Commit or Rollback methods. 2606 2607 :returns: Method frame from the Tx.Select-ok response 2608 :rtype: `pika.frame.Method` having `method` attribute of type 2609 `spec.Tx.SelectOk` 2610 2611 """ 2612 with _CallbackResult(self._MethodFrameCallbackResultArgs) as \ 2613 select_ok_result: 2614 self._impl.tx_select(select_ok_result.set_value_once) 2615 2616 self._flush_output(select_ok_result.is_ready) 2617 return select_ok_result.value.method_frame 2618 2619 def tx_commit(self): 2620 """Commit a transaction. 2621 2622 :returns: Method frame from the Tx.Commit-ok response 2623 :rtype: `pika.frame.Method` having `method` attribute of type 2624 `spec.Tx.CommitOk` 2625 2626 """ 2627 with _CallbackResult(self._MethodFrameCallbackResultArgs) as \ 2628 commit_ok_result: 2629 self._impl.tx_commit(commit_ok_result.set_value_once) 2630 2631 self._flush_output(commit_ok_result.is_ready) 2632 return commit_ok_result.value.method_frame 2633 2634 def tx_rollback(self): 2635 """Rollback a transaction. 2636 2637 :returns: Method frame from the Tx.Commit-ok response 2638 :rtype: `pika.frame.Method` having `method` attribute of type 2639 `spec.Tx.CommitOk` 2640 2641 """ 2642 with _CallbackResult(self._MethodFrameCallbackResultArgs) as \ 2643 rollback_ok_result: 2644 self._impl.tx_rollback(rollback_ok_result.set_value_once) 2645 2646 self._flush_output(rollback_ok_result.is_ready) 2647 return rollback_ok_result.value.method_frame 2648