1"""blocking adapter test""" 2from datetime import datetime 3import functools 4import logging 5import socket 6import threading 7import unittest 8import uuid 9 10import pika 11from pika.adapters import blocking_connection 12from pika.compat import as_bytes, time_now 13import pika.connection 14import pika.exceptions 15 16from ..forward_server import ForwardServer 17from .test_utils import retry_assertion 18 19# too-many-lines 20# pylint: disable=C0302 21 22# Disable warning about access to protected member 23# pylint: disable=W0212 24 25# Disable warning Attribute defined outside __init__ 26# pylint: disable=W0201 27 28# Disable warning Missing docstring 29# pylint: disable=C0111 30 31# Disable warning Too many public methods 32# pylint: disable=R0904 33 34# Disable warning Invalid variable name 35# pylint: disable=C0103 36 37 38LOGGER = logging.getLogger(__name__) 39 40PARAMS_URL_TEMPLATE = ( 41 'amqp://guest:guest@127.0.0.1:%(port)s/%%2f?socket_timeout=1') 42DEFAULT_URL = PARAMS_URL_TEMPLATE % {'port': 5672} 43DEFAULT_PARAMS = pika.URLParameters(DEFAULT_URL) 44DEFAULT_TIMEOUT = 15 45 46 47 48def setUpModule(): 49 logging.basicConfig(level=logging.DEBUG) 50 51 52class BlockingTestCaseBase(unittest.TestCase): 53 54 TIMEOUT = DEFAULT_TIMEOUT 55 56 def _connect(self, 57 url=DEFAULT_URL, 58 connection_class=pika.BlockingConnection, 59 impl_class=None): 60 parameters = pika.URLParameters(url) 61 62 return self._connect_params(parameters, 63 connection_class, 64 impl_class) 65 66 def _connect_params(self, 67 parameters, 68 connection_class=pika.BlockingConnection, 69 impl_class=None): 70 71 connection = connection_class(parameters, _impl_class=impl_class) 72 self.addCleanup(lambda: connection.close() 73 if connection.is_open else None) 74 75 # We use impl's timer directly in order to get a callback regardless 76 # of BlockingConnection's event dispatch modality 77 connection._impl._adapter_call_later(self.TIMEOUT, # pylint: disable=E1101 78 self._on_test_timeout) 79 80 # Patch calls into I/O loop to fail test if exceptions are 81 # leaked back through SelectConnection or the I/O loop. 82 self._instrument_io_loop_exception_leak_detection(connection) 83 84 return connection 85 86 def _instrument_io_loop_exception_leak_detection(self, connection): 87 """Instrument the given connection to detect and fail test when 88 an exception is leaked through the I/O loop 89 90 NOTE: BlockingConnection's underlying asynchronous connection adapter 91 (SelectConnection) uses callbacks to communicate with its user ( 92 BlockingConnection in this case). If BlockingConnection leaks 93 exceptions back into the I/O loop or the asynchronous connection 94 adapter, we interrupt their normal workflow and introduce a high 95 likelihood of state inconsistency. 96 """ 97 # Patch calls into I/O loop to fail test if exceptions are 98 # leaked back through SelectConnection or the I/O loop. 99 real_poll = connection._impl.ioloop.poll 100 def my_poll(*args, **kwargs): 101 try: 102 return real_poll(*args, **kwargs) 103 except BaseException as exc: 104 self.fail('Unwanted exception leaked into asynchronous layer ' 105 'via ioloop.poll(): {!r}'.format(exc)) 106 107 connection._impl.ioloop.poll = my_poll 108 self.addCleanup(setattr, connection._impl.ioloop, 'poll', real_poll) 109 110 real_process_timeouts = connection._impl.ioloop.process_timeouts 111 def my_process_timeouts(*args, **kwargs): 112 try: 113 return real_process_timeouts(*args, **kwargs) 114 except AssertionError: 115 # Our test timeout logic and unit test assert* routines rely 116 # on being able to pass AssertionError 117 raise 118 except BaseException as exc: 119 self.fail('Unwanted exception leaked into asynchronous layer ' 120 'via ioloop.process_timeouts(): {!r}'.format(exc)) 121 122 connection._impl.ioloop.process_timeouts = my_process_timeouts 123 self.addCleanup(setattr, connection._impl.ioloop, 'process_timeouts', 124 real_process_timeouts) 125 126 def _on_test_timeout(self): 127 """Called when test times out""" 128 LOGGER.info('%s TIMED OUT (%s)', datetime.utcnow(), self) 129 self.fail('Test timed out') 130 131 @retry_assertion(TIMEOUT/2) 132 def _assert_exact_message_count_with_retries(self, 133 channel, 134 queue, 135 expected_count): 136 frame = channel.queue_declare(queue, passive=True) 137 self.assertEqual(frame.method.message_count, expected_count) 138 139 140class TestCreateAndCloseConnection(BlockingTestCaseBase): 141 142 def test(self): 143 """BlockingConnection: Create and close connection""" 144 connection = self._connect() 145 self.assertIsInstance(connection, pika.BlockingConnection) 146 self.assertTrue(connection.is_open) 147 self.assertFalse(connection.is_closed) 148 self.assertFalse(connection._impl.is_closing) 149 150 connection.close() 151 self.assertTrue(connection.is_closed) 152 self.assertFalse(connection.is_open) 153 self.assertFalse(connection._impl.is_closing) 154 155 156class TestCreateConnectionWithNoneSocketAndStackTimeouts(BlockingTestCaseBase): 157 158 def test(self): 159 """ BlockingConnection: create a connection with socket and stack timeouts both None 160 161 """ 162 params = pika.URLParameters(DEFAULT_URL) 163 params.socket_timeout = None 164 params.stack_timeout = None 165 166 with self._connect_params(params) as connection: 167 self.assertTrue(connection.is_open) 168 169 170class TestCreateConnectionFromTwoConfigsFirstUnreachable(BlockingTestCaseBase): 171 172 def test(self): 173 """ BlockingConnection: create a connection from two configs, first unreachable 174 175 """ 176 # Reserve a port for use in connect 177 sock = socket.socket() 178 self.addCleanup(sock.close) 179 180 sock.bind(('127.0.0.1', 0)) 181 182 port = sock.getsockname()[1] 183 184 sock.close() 185 186 bad_params = pika.URLParameters(PARAMS_URL_TEMPLATE % {"port": port}) 187 good_params = pika.URLParameters(DEFAULT_URL) 188 189 with self._connect_params([bad_params, good_params]) as connection: 190 self.assertNotEqual(connection._impl.params.port, bad_params.port) 191 self.assertEqual(connection._impl.params.port, good_params.port) 192 193 194class TestCreateConnectionFromTwoUnreachableConfigs(BlockingTestCaseBase): 195 196 def test(self): 197 """ BlockingConnection: creating a connection from two unreachable \ 198 configs raises AMQPConnectionError 199 200 """ 201 # Reserve a port for use in connect 202 sock = socket.socket() 203 self.addCleanup(sock.close) 204 205 sock.bind(('127.0.0.1', 0)) 206 207 port = sock.getsockname()[1] 208 209 sock.close() 210 211 bad_params = pika.URLParameters(PARAMS_URL_TEMPLATE % {"port": port}) 212 213 with self.assertRaises(pika.exceptions.AMQPConnectionError): 214 self._connect_params([bad_params, bad_params]) 215 216 217class TestMultiCloseConnectionRaisesWrongState(BlockingTestCaseBase): 218 219 def test(self): 220 """BlockingConnection: Close connection twice raises ConnectionWrongStateError""" 221 connection = self._connect() 222 self.assertIsInstance(connection, pika.BlockingConnection) 223 self.assertTrue(connection.is_open) 224 self.assertFalse(connection.is_closed) 225 self.assertFalse(connection._impl.is_closing) 226 227 connection.close() 228 self.assertTrue(connection.is_closed) 229 self.assertFalse(connection.is_open) 230 self.assertFalse(connection._impl.is_closing) 231 232 with self.assertRaises(pika.exceptions.ConnectionWrongStateError): 233 connection.close() 234 235 236class TestConnectionContextManagerClosesConnection(BlockingTestCaseBase): 237 def test(self): 238 """BlockingConnection: connection context manager closes connection""" 239 with self._connect() as connection: 240 self.assertIsInstance(connection, pika.BlockingConnection) 241 self.assertTrue(connection.is_open) 242 243 self.assertTrue(connection.is_closed) 244 245 246class TestConnectionContextManagerExitSurvivesClosedConnection(BlockingTestCaseBase): 247 def test(self): 248 """BlockingConnection: connection context manager exit survives closed connection""" 249 with self._connect() as connection: 250 self.assertTrue(connection.is_open) 251 connection.close() 252 self.assertTrue(connection.is_closed) 253 254 self.assertTrue(connection.is_closed) 255 256 257class TestConnectionContextManagerClosesConnectionAndPassesOriginalException(BlockingTestCaseBase): 258 def test(self): 259 """BlockingConnection: connection context manager closes connection and passes original exception""" # pylint: disable=C0301 260 class MyException(Exception): 261 pass 262 263 with self.assertRaises(MyException): 264 with self._connect() as connection: 265 self.assertTrue(connection.is_open) 266 267 raise MyException() 268 269 self.assertTrue(connection.is_closed) 270 271 272class TestConnectionContextManagerClosesConnectionAndPassesSystemException(BlockingTestCaseBase): 273 def test(self): 274 """BlockingConnection: connection context manager closes connection and passes system exception""" # pylint: disable=C0301 275 with self.assertRaises(SystemExit): 276 with self._connect() as connection: 277 self.assertTrue(connection.is_open) 278 raise SystemExit() 279 280 self.assertTrue(connection.is_closed) 281 282 283class TestLostConnectionResultsInIsClosedConnectionAndChannel(BlockingTestCaseBase): 284 def test(self): 285 connection = self._connect() 286 channel = connection.channel() 287 288 # Simulate the server dropping the socket connection 289 connection._impl._transport._sock.shutdown(socket.SHUT_RDWR) 290 291 with self.assertRaises(pika.exceptions.StreamLostError): 292 # Changing QoS should result in ConnectionClosed 293 channel.basic_qos() 294 295 # Now check is_open/is_closed on channel and connection 296 self.assertFalse(channel.is_open) 297 self.assertTrue(channel.is_closed) 298 self.assertFalse(connection.is_open) 299 self.assertTrue(connection.is_closed) 300 301 302class TestInvalidExchangeTypeRaisesConnectionClosed(BlockingTestCaseBase): 303 def test(self): 304 """BlockingConnection: ConnectionClosed raised when creating exchange with invalid type""" # pylint: disable=C0301 305 # This test exploits behavior specific to RabbitMQ whereby the broker 306 # closes the connection if an attempt is made to declare an exchange 307 # with an invalid exchange type 308 connection = self._connect() 309 ch = connection.channel() 310 311 exg_name = ("TestInvalidExchangeTypeRaisesConnectionClosed_" + 312 uuid.uuid1().hex) 313 314 with self.assertRaises(pika.exceptions.ConnectionClosed) as ex_cm: 315 # Attempt to create an exchange with invalid exchange type 316 ch.exchange_declare(exg_name, exchange_type='ZZwwInvalid') 317 318 self.assertEqual(ex_cm.exception.args[0], 503) 319 320 321class TestCreateAndCloseConnectionWithChannelAndConsumer(BlockingTestCaseBase): 322 323 def test(self): 324 """BlockingConnection: Create and close connection with channel and consumer""" # pylint: disable=C0301 325 connection = self._connect() 326 327 ch = connection.channel() 328 329 q_name = ( 330 'TestCreateAndCloseConnectionWithChannelAndConsumer_q' + 331 uuid.uuid1().hex) 332 333 body1 = 'a' * 1024 334 335 # Declare a new queue 336 ch.queue_declare(q_name, auto_delete=True) 337 self.addCleanup(lambda: self._connect().channel().queue_delete(q_name)) 338 339 # Publish the message to the queue by way of default exchange 340 ch.basic_publish(exchange='', routing_key=q_name, body=body1) 341 342 # Create a consumer that uses automatic ack mode 343 ch.basic_consume(q_name, lambda *x: None, auto_ack=True, 344 exclusive=False, arguments=None) 345 346 connection.close() 347 self.assertTrue(connection.is_closed) 348 self.assertFalse(connection.is_open) 349 self.assertFalse(connection._impl.is_closing) 350 351 self.assertFalse(connection._impl._channels) 352 353 self.assertFalse(ch._consumer_infos) 354 self.assertFalse(ch._impl._consumers) 355 356 357class TestUsingInvalidQueueArgument(BlockingTestCaseBase): 358 def test(self): 359 """BlockingConnection raises expected exception when invalid queue parameter is used 360 """ 361 connection = self._connect() 362 ch = connection.channel() 363 with self.assertRaises(TypeError): 364 ch.queue_declare(queue=[1, 2, 3]) 365 366 367class TestSuddenBrokerDisconnectBeforeChannel(BlockingTestCaseBase): 368 369 def test(self): 370 """BlockingConnection resets properly on TCP/IP drop during channel() 371 """ 372 with ForwardServer(remote_addr=(DEFAULT_PARAMS.host, DEFAULT_PARAMS.port), 373 local_linger_args=(1, 0)) as fwd: 374 375 self.connection = self._connect( 376 PARAMS_URL_TEMPLATE % {"port": fwd.server_address[1]}) 377 378 # Once outside the context, the connection is broken 379 380 # BlockingConnection should raise ConnectionClosed 381 with self.assertRaises(pika.exceptions.StreamLostError): 382 self.connection.channel() 383 384 self.assertTrue(self.connection.is_closed) 385 self.assertFalse(self.connection.is_open) 386 self.assertIsNone(self.connection._impl._transport) 387 388 389class TestNoAccessToConnectionAfterConnectionLost(BlockingTestCaseBase): 390 391 def test(self): 392 """BlockingConnection no access file descriptor after StreamLostError 393 """ 394 with ForwardServer(remote_addr=(DEFAULT_PARAMS.host, DEFAULT_PARAMS.port), 395 local_linger_args=(1, 0)) as fwd: 396 397 self.connection = self._connect( 398 PARAMS_URL_TEMPLATE % {"port": fwd.server_address[1]}) 399 400 # Once outside the context, the connection is broken 401 402 # BlockingConnection should raise ConnectionClosed 403 with self.assertRaises(pika.exceptions.StreamLostError): 404 self.connection.channel() 405 406 self.assertTrue(self.connection.is_closed) 407 self.assertFalse(self.connection.is_open) 408 self.assertIsNone(self.connection._impl._transport) 409 410 # Attempt to operate on the connection once again after ConnectionClosed 411 with self.assertRaises(pika.exceptions.ConnectionWrongStateError): 412 self.connection.channel() 413 414 415class TestConnectWithDownedBroker(BlockingTestCaseBase): 416 417 def test(self): 418 """ BlockingConnection to downed broker results in AMQPConnectionError 419 420 """ 421 # Reserve a port for use in connect 422 sock = socket.socket() 423 self.addCleanup(sock.close) 424 425 sock.bind(('127.0.0.1', 0)) 426 427 port = sock.getsockname()[1] 428 429 sock.close() 430 431 with self.assertRaises(pika.exceptions.AMQPConnectionError): 432 self.connection = self._connect( 433 PARAMS_URL_TEMPLATE % {"port": port}) 434 435 436class TestDisconnectDuringConnectionStart(BlockingTestCaseBase): 437 438 def test(self): 439 """ BlockingConnection TCP/IP connection loss in CONNECTION_START 440 """ 441 fwd = ForwardServer( 442 remote_addr=(DEFAULT_PARAMS.host, DEFAULT_PARAMS.port), 443 local_linger_args=(1, 0)) 444 445 fwd.start() 446 self.addCleanup(lambda: fwd.stop() if fwd.running else None) 447 448 class MySelectConnection(pika.SelectConnection): 449 assert hasattr(pika.SelectConnection, '_on_connection_start') 450 451 def _on_connection_start(self, *args, **kwargs): # pylint: disable=W0221 452 fwd.stop() 453 return super(MySelectConnection, self)._on_connection_start( 454 *args, **kwargs) 455 456 with self.assertRaises(pika.exceptions.ProbableAuthenticationError): 457 self._connect( 458 PARAMS_URL_TEMPLATE % {"port": fwd.server_address[1]}, 459 impl_class=MySelectConnection) 460 461 462class TestDisconnectDuringConnectionTune(BlockingTestCaseBase): 463 464 def test(self): 465 """ BlockingConnection TCP/IP connection loss in CONNECTION_TUNE 466 """ 467 fwd = ForwardServer( 468 remote_addr=(DEFAULT_PARAMS.host, DEFAULT_PARAMS.port), 469 local_linger_args=(1, 0)) 470 fwd.start() 471 self.addCleanup(lambda: fwd.stop() if fwd.running else None) 472 473 class MySelectConnection(pika.SelectConnection): 474 assert hasattr(pika.SelectConnection, '_on_connection_tune') 475 476 def _on_connection_tune(self, *args, **kwargs): # pylint: disable=W0221 477 fwd.stop() 478 return super(MySelectConnection, self)._on_connection_tune( 479 *args, **kwargs) 480 481 with self.assertRaises(pika.exceptions.ProbableAccessDeniedError): 482 self._connect( 483 PARAMS_URL_TEMPLATE % {"port": fwd.server_address[1]}, 484 impl_class=MySelectConnection) 485 486 487class TestDisconnectDuringConnectionProtocol(BlockingTestCaseBase): 488 489 def test(self): 490 """ BlockingConnection TCP/IP connection loss in CONNECTION_PROTOCOL 491 """ 492 fwd = ForwardServer( 493 remote_addr=(DEFAULT_PARAMS.host, DEFAULT_PARAMS.port), 494 local_linger_args=(1, 0)) 495 496 fwd.start() 497 self.addCleanup(lambda: fwd.stop() if fwd.running else None) 498 499 class MySelectConnection(pika.SelectConnection): 500 assert hasattr(pika.SelectConnection, '_on_stream_connected') 501 502 def _on_stream_connected(self, *args, **kwargs): # pylint: disable=W0221 503 fwd.stop() 504 return super(MySelectConnection, self)._on_stream_connected( 505 *args, **kwargs) 506 507 with self.assertRaises(pika.exceptions.IncompatibleProtocolError): 508 self._connect(PARAMS_URL_TEMPLATE % {"port": fwd.server_address[1]}, 509 impl_class=MySelectConnection) 510 511 512class TestProcessDataEvents(BlockingTestCaseBase): 513 514 def test(self): 515 """BlockingConnection.process_data_events""" 516 connection = self._connect() 517 518 # Try with time_limit=0 519 start_time = time_now() 520 connection.process_data_events(time_limit=0) 521 elapsed = time_now() - start_time 522 self.assertLess(elapsed, 0.25) 523 524 # Try with time_limit=0.005 525 start_time = time_now() 526 connection.process_data_events(time_limit=0.005) 527 elapsed = time_now() - start_time 528 self.assertGreaterEqual(elapsed, 0.005) 529 self.assertLess(elapsed, 0.25) 530 531 532class TestConnectionRegisterForBlockAndUnblock(BlockingTestCaseBase): 533 534 def test(self): 535 """BlockingConnection register for Connection.Blocked/Unblocked""" 536 connection = self._connect() 537 538 # NOTE: I haven't figured out yet how to coerce RabbitMQ to emit 539 # Connection.Block and Connection.Unblock from the test, so we'll 540 # just call the registration functions for now and simulate incoming 541 # blocked/unblocked frames 542 543 blocked_buffer = [] 544 connection.add_on_connection_blocked_callback( 545 lambda conn, frame: blocked_buffer.append((conn, frame))) 546 # Simulate dispatch of blocked connection 547 blocked_frame = pika.frame.Method( 548 0, 549 pika.spec.Connection.Blocked('reason')) 550 connection._impl._process_frame(blocked_frame) 551 connection.sleep(0) # facilitate dispatch of pending events 552 self.assertEqual(len(blocked_buffer), 1) 553 conn, frame = blocked_buffer[0] 554 self.assertIs(conn, connection) 555 self.assertIs(frame, blocked_frame) 556 557 unblocked_buffer = [] 558 connection.add_on_connection_unblocked_callback( 559 lambda conn, frame: unblocked_buffer.append((conn, frame))) 560 # Simulate dispatch of unblocked connection 561 unblocked_frame = pika.frame.Method(0, pika.spec.Connection.Unblocked()) 562 connection._impl._process_frame(unblocked_frame) 563 connection.sleep(0) # facilitate dispatch of pending events 564 self.assertEqual(len(unblocked_buffer), 1) 565 conn, frame = unblocked_buffer[0] 566 self.assertIs(conn, connection) 567 self.assertIs(frame, unblocked_frame) 568 569 570class TestBlockedConnectionTimeout(BlockingTestCaseBase): 571 572 def test(self): 573 """BlockingConnection Connection.Blocked timeout """ 574 url = DEFAULT_URL + '&blocked_connection_timeout=0.001' 575 conn = self._connect(url=url) 576 577 # NOTE: I haven't figured out yet how to coerce RabbitMQ to emit 578 # Connection.Block and Connection.Unblock from the test, so we'll 579 # simulate it for now 580 581 # Simulate Connection.Blocked 582 conn._impl._on_connection_blocked( 583 conn._impl, 584 pika.frame.Method( 585 0, 586 pika.spec.Connection.Blocked('TestBlockedConnectionTimeout'))) 587 588 # Wait for connection teardown 589 with self.assertRaises(pika.exceptions.ConnectionBlockedTimeout): 590 while True: 591 conn.process_data_events(time_limit=1) 592 593 594class TestAddCallbackThreadsafeFromSameThread(BlockingTestCaseBase): 595 596 def test(self): 597 """BlockingConnection.add_callback_threadsafe from same thread""" 598 connection = self._connect() 599 600 # Test timer completion 601 start_time = time_now() 602 rx_callback = [] 603 connection.add_callback_threadsafe( 604 lambda: rx_callback.append(time_now())) 605 while not rx_callback: 606 connection.process_data_events(time_limit=None) 607 608 self.assertEqual(len(rx_callback), 1) 609 elapsed = time_now() - start_time 610 self.assertLess(elapsed, 0.25) 611 612 613class TestAddCallbackThreadsafeFromAnotherThread(BlockingTestCaseBase): 614 615 def test(self): 616 """BlockingConnection.add_callback_threadsafe from another thread""" 617 connection = self._connect() 618 619 # Test timer completion 620 start_time = time_now() 621 rx_callback = [] 622 timer = threading.Timer( 623 0, 624 functools.partial(connection.add_callback_threadsafe, 625 lambda: rx_callback.append(time_now()))) 626 self.addCleanup(timer.cancel) 627 timer.start() 628 while not rx_callback: 629 connection.process_data_events(time_limit=None) 630 631 self.assertEqual(len(rx_callback), 1) 632 elapsed = time_now() - start_time 633 self.assertLess(elapsed, 0.25) 634 635 636class TestAddCallbackThreadsafeOnClosedConnectionRaisesWrongState( 637 BlockingTestCaseBase): 638 639 def test(self): 640 """BlockingConnection.add_callback_threadsafe on closed connection raises ConnectionWrongStateError""" 641 connection = self._connect() 642 connection.close() 643 644 with self.assertRaises(pika.exceptions.ConnectionWrongStateError): 645 connection.add_callback_threadsafe(lambda: None) 646 647 648class TestAddTimeoutRemoveTimeout(BlockingTestCaseBase): 649 650 def test(self): 651 """BlockingConnection.call_later and remove_timeout""" 652 connection = self._connect() 653 654 # Test timer completion 655 start_time = time_now() 656 rx_callback = [] 657 timer_id = connection.call_later( 658 0.005, 659 lambda: rx_callback.append(time_now())) 660 while not rx_callback: 661 connection.process_data_events(time_limit=None) 662 663 self.assertEqual(len(rx_callback), 1) 664 elapsed = time_now() - start_time 665 self.assertLess(elapsed, 0.25) 666 667 # Test removing triggered timeout 668 connection.remove_timeout(timer_id) 669 670 671 # Test aborted timer 672 rx_callback = [] 673 timer_id = connection.call_later( 674 0.001, 675 lambda: rx_callback.append(time_now())) 676 connection.remove_timeout(timer_id) 677 connection.process_data_events(time_limit=0.1) 678 self.assertFalse(rx_callback) 679 680 # Make sure _TimerEvt repr doesn't crash 681 evt = blocking_connection._TimerEvt(lambda: None) 682 repr(evt) 683 684 685class TestViabilityOfMultipleTimeoutsWithSameDeadlineAndCallback(BlockingTestCaseBase): 686 687 def test(self): 688 """BlockingConnection viability of multiple timeouts with same deadline and callback""" 689 connection = self._connect() 690 691 rx_callback = [] 692 693 def callback(): 694 rx_callback.append(1) 695 696 timer1 = connection.call_later(0, callback) 697 timer2 = connection.call_later(0, callback) 698 699 self.assertIsNot(timer1, timer2) 700 701 connection.remove_timeout(timer1) 702 703 # Wait for second timer to fire 704 start_wait_time = time_now() 705 while not rx_callback and time_now() - start_wait_time < 0.25: 706 connection.process_data_events(time_limit=0.001) 707 708 self.assertListEqual(rx_callback, [1]) 709 710 711class TestRemoveTimeoutFromTimeoutCallback(BlockingTestCaseBase): 712 713 def test(self): 714 """BlockingConnection.remove_timeout from timeout callback""" 715 connection = self._connect() 716 717 # Test timer completion 718 timer_id1 = connection.call_later(5, lambda: 0/0) 719 720 rx_timer2 = [] 721 def on_timer2(): 722 connection.remove_timeout(timer_id1) 723 connection.remove_timeout(timer_id2) 724 rx_timer2.append(1) 725 726 timer_id2 = connection.call_later(0, on_timer2) 727 728 while not rx_timer2: 729 connection.process_data_events(time_limit=None) 730 731 self.assertFalse(connection._ready_events) 732 733 734class TestSleep(BlockingTestCaseBase): 735 736 def test(self): 737 """BlockingConnection.sleep""" 738 connection = self._connect() 739 740 # Try with duration=0 741 start_time = time_now() 742 connection.sleep(duration=0) 743 elapsed = time_now() - start_time 744 self.assertLess(elapsed, 0.25) 745 746 # Try with duration=0.005 747 start_time = time_now() 748 connection.sleep(duration=0.005) 749 elapsed = time_now() - start_time 750 self.assertGreaterEqual(elapsed, 0.005) 751 self.assertLess(elapsed, 0.25) 752 753 754class TestConnectionProperties(BlockingTestCaseBase): 755 756 def test(self): 757 """Test BlockingConnection properties""" 758 connection = self._connect() 759 760 self.assertTrue(connection.is_open) 761 self.assertFalse(connection._impl.is_closing) 762 self.assertFalse(connection.is_closed) 763 764 self.assertTrue(connection.basic_nack_supported) 765 self.assertTrue(connection.consumer_cancel_notify_supported) 766 self.assertTrue(connection.exchange_exchange_bindings_supported) 767 self.assertTrue(connection.publisher_confirms_supported) 768 769 connection.close() 770 self.assertFalse(connection.is_open) 771 self.assertFalse(connection._impl.is_closing) 772 self.assertTrue(connection.is_closed) 773 774 775 776class TestCreateAndCloseChannel(BlockingTestCaseBase): 777 778 def test(self): 779 """BlockingChannel: Create and close channel""" 780 connection = self._connect() 781 782 ch = connection.channel() 783 self.assertIsInstance(ch, blocking_connection.BlockingChannel) 784 self.assertTrue(ch.is_open) 785 self.assertFalse(ch.is_closed) 786 self.assertFalse(ch._impl.is_closing) 787 self.assertIs(ch.connection, connection) 788 789 ch.close() 790 self.assertTrue(ch.is_closed) 791 self.assertFalse(ch.is_open) 792 self.assertFalse(ch._impl.is_closing) 793 794 795class TestExchangeDeclareAndDelete(BlockingTestCaseBase): 796 797 def test(self): 798 """BlockingChannel: Test exchange_declare and exchange_delete""" 799 connection = self._connect() 800 801 ch = connection.channel() 802 803 name = "TestExchangeDeclareAndDelete_" + uuid.uuid1().hex 804 805 # Declare a new exchange 806 frame = ch.exchange_declare(name, exchange_type='direct') 807 self.addCleanup(connection.channel().exchange_delete, name) 808 809 self.assertIsInstance(frame.method, pika.spec.Exchange.DeclareOk) 810 811 # Check if it exists by declaring it passively 812 frame = ch.exchange_declare(name, passive=True) 813 self.assertIsInstance(frame.method, pika.spec.Exchange.DeclareOk) 814 815 # Delete the exchange 816 frame = ch.exchange_delete(name) 817 self.assertIsInstance(frame.method, pika.spec.Exchange.DeleteOk) 818 819 # Verify that it's been deleted 820 with self.assertRaises(pika.exceptions.ChannelClosedByBroker) as cm: 821 ch.exchange_declare(name, passive=True) 822 823 self.assertEqual(cm.exception.args[0], 404) 824 825 826class TestExchangeBindAndUnbind(BlockingTestCaseBase): 827 828 def test(self): 829 """BlockingChannel: Test exchange_bind and exchange_unbind""" 830 connection = self._connect() 831 832 ch = connection.channel() 833 834 q_name = 'TestExchangeBindAndUnbind_q' + uuid.uuid1().hex 835 src_exg_name = 'TestExchangeBindAndUnbind_src_exg_' + uuid.uuid1().hex 836 dest_exg_name = 'TestExchangeBindAndUnbind_dest_exg_' + uuid.uuid1().hex 837 routing_key = 'TestExchangeBindAndUnbind' 838 839 # Place channel in publisher-acknowledgments mode so that we may test 840 # whether the queue is reachable by publishing with mandatory=True 841 res = ch.confirm_delivery() 842 self.assertIsNone(res) 843 844 # Declare both exchanges 845 ch.exchange_declare(src_exg_name, exchange_type='direct') 846 self.addCleanup(connection.channel().exchange_delete, src_exg_name) 847 ch.exchange_declare(dest_exg_name, exchange_type='direct') 848 self.addCleanup(connection.channel().exchange_delete, dest_exg_name) 849 850 # Declare a new queue 851 ch.queue_declare(q_name, auto_delete=True) 852 self.addCleanup(lambda: self._connect().channel().queue_delete(q_name)) 853 854 # Bind the queue to the destination exchange 855 ch.queue_bind(q_name, exchange=dest_exg_name, routing_key=routing_key) 856 857 # Verify that the queue is unreachable without exchange-exchange binding 858 with self.assertRaises(pika.exceptions.UnroutableError): 859 ch.basic_publish(src_exg_name, routing_key, body='', mandatory=True) 860 861 # Bind the exchanges 862 frame = ch.exchange_bind(destination=dest_exg_name, source=src_exg_name, 863 routing_key=routing_key) 864 self.assertIsInstance(frame.method, pika.spec.Exchange.BindOk) 865 866 # Publish a message via the source exchange 867 ch.basic_publish(src_exg_name, routing_key, body='TestExchangeBindAndUnbind', 868 mandatory=True) 869 870 # Check that the queue now has one message 871 self._assert_exact_message_count_with_retries(channel=ch, 872 queue=q_name, 873 expected_count=1) 874 875 # Unbind the exchanges 876 frame = ch.exchange_unbind(destination=dest_exg_name, 877 source=src_exg_name, 878 routing_key=routing_key) 879 self.assertIsInstance(frame.method, pika.spec.Exchange.UnbindOk) 880 881 # Verify that the queue is now unreachable via the source exchange 882 with self.assertRaises(pika.exceptions.UnroutableError): 883 ch.basic_publish(src_exg_name, routing_key, body='', mandatory=True) 884 885 886class TestQueueDeclareAndDelete(BlockingTestCaseBase): 887 888 def test(self): 889 """BlockingChannel: Test queue_declare and queue_delete""" 890 connection = self._connect() 891 892 ch = connection.channel() 893 894 q_name = 'TestQueueDeclareAndDelete_' + uuid.uuid1().hex 895 896 # Declare a new queue 897 frame = ch.queue_declare(q_name, auto_delete=True) 898 self.addCleanup(lambda: self._connect().channel().queue_delete(q_name)) 899 900 self.assertIsInstance(frame.method, pika.spec.Queue.DeclareOk) 901 902 # Check if it exists by declaring it passively 903 frame = ch.queue_declare(q_name, passive=True) 904 self.assertIsInstance(frame.method, pika.spec.Queue.DeclareOk) 905 906 # Delete the queue 907 frame = ch.queue_delete(q_name) 908 self.assertIsInstance(frame.method, pika.spec.Queue.DeleteOk) 909 910 # Verify that it's been deleted 911 with self.assertRaises(pika.exceptions.ChannelClosedByBroker) as cm: 912 ch.queue_declare(q_name, passive=True) 913 914 self.assertEqual(cm.exception.args[0], 404) 915 916 917class TestPassiveQueueDeclareOfUnknownQueueRaisesChannelClosed( 918 BlockingTestCaseBase): 919 def test(self): 920 """BlockingChannel: ChannelClosed raised when passive-declaring unknown queue""" # pylint: disable=C0301 921 connection = self._connect() 922 ch = connection.channel() 923 924 q_name = ("TestPassiveQueueDeclareOfUnknownQueueRaisesChannelClosed_q_" 925 + uuid.uuid1().hex) 926 927 with self.assertRaises(pika.exceptions.ChannelClosedByBroker) as ex_cm: 928 ch.queue_declare(q_name, passive=True) 929 930 self.assertEqual(ex_cm.exception.args[0], 404) 931 932 933class TestQueueBindAndUnbindAndPurge(BlockingTestCaseBase): 934 935 def test(self): 936 """BlockingChannel: Test queue_bind and queue_unbind""" 937 connection = self._connect() 938 939 ch = connection.channel() 940 941 q_name = 'TestQueueBindAndUnbindAndPurge_q' + uuid.uuid1().hex 942 exg_name = 'TestQueueBindAndUnbindAndPurge_exg_' + uuid.uuid1().hex 943 routing_key = 'TestQueueBindAndUnbindAndPurge' 944 945 # Place channel in publisher-acknowledgments mode so that we may test 946 # whether the queue is reachable by publishing with mandatory=True 947 res = ch.confirm_delivery() 948 self.assertIsNone(res) 949 950 # Declare a new exchange 951 ch.exchange_declare(exg_name, exchange_type='direct') 952 self.addCleanup(connection.channel().exchange_delete, exg_name) 953 954 # Declare a new queue 955 ch.queue_declare(q_name, auto_delete=True) 956 self.addCleanup(lambda: self._connect().channel().queue_delete(q_name)) 957 958 # Bind the queue to the exchange using routing key 959 frame = ch.queue_bind(q_name, exchange=exg_name, 960 routing_key=routing_key) 961 self.assertIsInstance(frame.method, pika.spec.Queue.BindOk) 962 963 # Check that the queue is empty 964 frame = ch.queue_declare(q_name, passive=True) 965 self.assertEqual(frame.method.message_count, 0) 966 967 # Deposit a message in the queue 968 ch.basic_publish(exg_name, routing_key, body='TestQueueBindAndUnbindAndPurge', 969 mandatory=True) 970 971 # Check that the queue now has one message 972 frame = ch.queue_declare(q_name, passive=True) 973 self.assertEqual(frame.method.message_count, 1) 974 975 # Unbind the queue 976 frame = ch.queue_unbind(queue=q_name, exchange=exg_name, 977 routing_key=routing_key) 978 self.assertIsInstance(frame.method, pika.spec.Queue.UnbindOk) 979 980 # Verify that the queue is now unreachable via that binding 981 with self.assertRaises(pika.exceptions.UnroutableError): 982 ch.basic_publish(exg_name, routing_key, 983 body='TestQueueBindAndUnbindAndPurge-2', 984 mandatory=True) 985 986 # Purge the queue and verify that 1 message was purged 987 frame = ch.queue_purge(q_name) 988 self.assertIsInstance(frame.method, pika.spec.Queue.PurgeOk) 989 self.assertEqual(frame.method.message_count, 1) 990 991 # Verify that the queue is now empty 992 frame = ch.queue_declare(q_name, passive=True) 993 self.assertEqual(frame.method.message_count, 0) 994 995 996class TestBasicGet(BlockingTestCaseBase): 997 998 def tearDown(self): 999 LOGGER.info('%s TEARING DOWN (%s)', datetime.utcnow(), self) 1000 1001 def test(self): 1002 """BlockingChannel.basic_get""" 1003 LOGGER.info('%s STARTED (%s)', datetime.utcnow(), self) 1004 1005 connection = self._connect() 1006 LOGGER.info('%s CONNECTED (%s)', datetime.utcnow(), self) 1007 1008 ch = connection.channel() 1009 LOGGER.info('%s CREATED CHANNEL (%s)', datetime.utcnow(), self) 1010 1011 q_name = 'TestBasicGet_q' + uuid.uuid1().hex 1012 1013 # Place channel in publisher-acknowledgments mode so that the message 1014 # may be delivered synchronously to the queue by publishing it with 1015 # mandatory=True 1016 ch.confirm_delivery() 1017 LOGGER.info('%s ENABLED PUB-ACKS (%s)', datetime.utcnow(), self) 1018 1019 # Declare a new queue 1020 ch.queue_declare(q_name, auto_delete=True) 1021 self.addCleanup(lambda: self._connect().channel().queue_delete(q_name)) 1022 LOGGER.info('%s DECLARED QUEUE (%s)', datetime.utcnow(), self) 1023 1024 # Verify result of getting a message from an empty queue 1025 msg = ch.basic_get(q_name, auto_ack=False) 1026 self.assertTupleEqual(msg, (None, None, None)) 1027 LOGGER.info('%s GOT FROM EMPTY QUEUE (%s)', datetime.utcnow(), self) 1028 1029 body = 'TestBasicGet' 1030 # Deposit a message in the queue via default exchange 1031 ch.basic_publish(exchange='', routing_key=q_name, 1032 body=body, mandatory=True) 1033 LOGGER.info('%s PUBLISHED (%s)', datetime.utcnow(), self) 1034 1035 # Get the message 1036 (method, properties, body) = ch.basic_get(q_name, auto_ack=False) 1037 LOGGER.info('%s GOT FROM NON-EMPTY QUEUE (%s)', datetime.utcnow(), self) 1038 self.assertIsInstance(method, pika.spec.Basic.GetOk) 1039 self.assertEqual(method.delivery_tag, 1) 1040 self.assertFalse(method.redelivered) 1041 self.assertEqual(method.exchange, '') 1042 self.assertEqual(method.routing_key, q_name) 1043 self.assertEqual(method.message_count, 0) 1044 1045 self.assertIsInstance(properties, pika.BasicProperties) 1046 self.assertIsNone(properties.headers) 1047 self.assertEqual(body, as_bytes(body)) 1048 1049 # Ack it 1050 ch.basic_ack(delivery_tag=method.delivery_tag) 1051 LOGGER.info('%s ACKED (%s)', datetime.utcnow(), self) 1052 1053 # Verify that the queue is now empty 1054 self._assert_exact_message_count_with_retries(channel=ch, 1055 queue=q_name, 1056 expected_count=0) 1057 1058 1059class TestBasicReject(BlockingTestCaseBase): 1060 1061 def test(self): 1062 """BlockingChannel.basic_reject""" 1063 connection = self._connect() 1064 1065 ch = connection.channel() 1066 1067 q_name = 'TestBasicReject_q' + uuid.uuid1().hex 1068 1069 # Place channel in publisher-acknowledgments mode so that the message 1070 # may be delivered synchronously to the queue by publishing it with 1071 # mandatory=True 1072 ch.confirm_delivery() 1073 1074 # Declare a new queue 1075 ch.queue_declare(q_name, auto_delete=True) 1076 self.addCleanup(lambda: self._connect().channel().queue_delete(q_name)) 1077 1078 # Deposit two messages in the queue via default exchange 1079 ch.basic_publish(exchange='', routing_key=q_name, 1080 body='TestBasicReject1', mandatory=True) 1081 ch.basic_publish(exchange='', routing_key=q_name, 1082 body='TestBasicReject2', mandatory=True) 1083 1084 # Get the messages 1085 (rx_method, _, rx_body) = ch.basic_get(q_name, auto_ack=False) 1086 self.assertEqual(rx_body, as_bytes('TestBasicReject1')) 1087 1088 (rx_method, _, rx_body) = ch.basic_get(q_name, auto_ack=False) 1089 self.assertEqual(rx_body, as_bytes('TestBasicReject2')) 1090 1091 # Nack the second message 1092 ch.basic_reject(rx_method.delivery_tag, requeue=True) 1093 1094 # Verify that exactly one message is present in the queue, namely the 1095 # second one 1096 self._assert_exact_message_count_with_retries(channel=ch, 1097 queue=q_name, 1098 expected_count=1) 1099 (rx_method, _, rx_body) = ch.basic_get(q_name, auto_ack=False) 1100 self.assertEqual(rx_body, as_bytes('TestBasicReject2')) 1101 1102 1103class TestBasicRejectNoRequeue(BlockingTestCaseBase): 1104 1105 def test(self): 1106 """BlockingChannel.basic_reject with requeue=False""" 1107 connection = self._connect() 1108 1109 ch = connection.channel() 1110 1111 q_name = 'TestBasicRejectNoRequeue_q' + uuid.uuid1().hex 1112 1113 # Place channel in publisher-acknowledgments mode so that the message 1114 # may be delivered synchronously to the queue by publishing it with 1115 # mandatory=True 1116 ch.confirm_delivery() 1117 1118 # Declare a new queue 1119 ch.queue_declare(q_name, auto_delete=True) 1120 self.addCleanup(lambda: self._connect().channel().queue_delete(q_name)) 1121 1122 # Deposit two messages in the queue via default exchange 1123 ch.basic_publish(exchange='', routing_key=q_name, 1124 body='TestBasicRejectNoRequeue1', mandatory=True) 1125 ch.basic_publish(exchange='', routing_key=q_name, 1126 body='TestBasicRejectNoRequeue2', mandatory=True) 1127 1128 # Get the messages 1129 (rx_method, _, rx_body) = ch.basic_get(q_name, auto_ack=False) 1130 self.assertEqual(rx_body, 1131 as_bytes('TestBasicRejectNoRequeue1')) 1132 1133 (rx_method, _, rx_body) = ch.basic_get(q_name, auto_ack=False) 1134 self.assertEqual(rx_body, 1135 as_bytes('TestBasicRejectNoRequeue2')) 1136 1137 # Nack the second message 1138 ch.basic_reject(rx_method.delivery_tag, requeue=False) 1139 1140 # Verify that no messages are present in the queue 1141 self._assert_exact_message_count_with_retries(channel=ch, 1142 queue=q_name, 1143 expected_count=0) 1144 1145 1146class TestBasicNack(BlockingTestCaseBase): 1147 1148 def test(self): 1149 """BlockingChannel.basic_nack single message""" 1150 connection = self._connect() 1151 1152 ch = connection.channel() 1153 1154 q_name = 'TestBasicNack_q' + uuid.uuid1().hex 1155 1156 # Place channel in publisher-acknowledgments mode so that the message 1157 # may be delivered synchronously to the queue by publishing it with 1158 # mandatory=True 1159 ch.confirm_delivery() 1160 1161 # Declare a new queue 1162 ch.queue_declare(q_name, auto_delete=True) 1163 self.addCleanup(lambda: self._connect().channel().queue_delete(q_name)) 1164 1165 # Deposit two messages in the queue via default exchange 1166 ch.basic_publish(exchange='', routing_key=q_name, 1167 body='TestBasicNack1', mandatory=True) 1168 ch.basic_publish(exchange='', routing_key=q_name, 1169 body='TestBasicNack2', mandatory=True) 1170 1171 # Get the messages 1172 (rx_method, _, rx_body) = ch.basic_get(q_name, auto_ack=False) 1173 self.assertEqual(rx_body, as_bytes('TestBasicNack1')) 1174 1175 (rx_method, _, rx_body) = ch.basic_get(q_name, auto_ack=False) 1176 self.assertEqual(rx_body, as_bytes('TestBasicNack2')) 1177 1178 # Nack the second message 1179 ch.basic_nack(rx_method.delivery_tag, multiple=False, requeue=True) 1180 1181 # Verify that exactly one message is present in the queue, namely the 1182 # second one 1183 self._assert_exact_message_count_with_retries(channel=ch, 1184 queue=q_name, 1185 expected_count=1) 1186 (rx_method, _, rx_body) = ch.basic_get(q_name, auto_ack=False) 1187 self.assertEqual(rx_body, as_bytes('TestBasicNack2')) 1188 1189 1190class TestBasicNackNoRequeue(BlockingTestCaseBase): 1191 1192 def test(self): 1193 """BlockingChannel.basic_nack with requeue=False""" 1194 connection = self._connect() 1195 1196 ch = connection.channel() 1197 1198 q_name = 'TestBasicNackNoRequeue_q' + uuid.uuid1().hex 1199 1200 # Place channel in publisher-acknowledgments mode so that the message 1201 # may be delivered synchronously to the queue by publishing it with 1202 # mandatory=True 1203 ch.confirm_delivery() 1204 1205 # Declare a new queue 1206 ch.queue_declare(q_name, auto_delete=True) 1207 self.addCleanup(lambda: self._connect().channel().queue_delete(q_name)) 1208 1209 # Deposit two messages in the queue via default exchange 1210 ch.basic_publish(exchange='', routing_key=q_name, 1211 body='TestBasicNackNoRequeue1', mandatory=True) 1212 ch.basic_publish(exchange='', routing_key=q_name, 1213 body='TestBasicNackNoRequeue2', mandatory=True) 1214 1215 # Get the messages 1216 (rx_method, _, rx_body) = ch.basic_get(q_name, auto_ack=False) 1217 self.assertEqual(rx_body, 1218 as_bytes('TestBasicNackNoRequeue1')) 1219 1220 (rx_method, _, rx_body) = ch.basic_get(q_name, auto_ack=False) 1221 self.assertEqual(rx_body, 1222 as_bytes('TestBasicNackNoRequeue2')) 1223 1224 # Nack the second message 1225 ch.basic_nack(rx_method.delivery_tag, requeue=False) 1226 1227 # Verify that no messages are present in the queue 1228 self._assert_exact_message_count_with_retries(channel=ch, 1229 queue=q_name, 1230 expected_count=0) 1231 1232 1233class TestBasicNackMultiple(BlockingTestCaseBase): 1234 1235 def test(self): 1236 """BlockingChannel.basic_nack multiple messages""" 1237 connection = self._connect() 1238 1239 ch = connection.channel() 1240 1241 q_name = 'TestBasicNackMultiple_q' + uuid.uuid1().hex 1242 1243 # Place channel in publisher-acknowledgments mode so that the message 1244 # may be delivered synchronously to the queue by publishing it with 1245 # mandatory=True 1246 ch.confirm_delivery() 1247 1248 # Declare a new queue 1249 ch.queue_declare(q_name, auto_delete=True) 1250 self.addCleanup(lambda: self._connect().channel().queue_delete(q_name)) 1251 1252 # Deposit two messages in the queue via default exchange 1253 ch.basic_publish(exchange='', routing_key=q_name, 1254 body='TestBasicNackMultiple1', mandatory=True) 1255 ch.basic_publish(exchange='', routing_key=q_name, 1256 body='TestBasicNackMultiple2', mandatory=True) 1257 1258 # Get the messages 1259 (rx_method, _, rx_body) = ch.basic_get(q_name, auto_ack=False) 1260 self.assertEqual(rx_body, 1261 as_bytes('TestBasicNackMultiple1')) 1262 1263 (rx_method, _, rx_body) = ch.basic_get(q_name, auto_ack=False) 1264 self.assertEqual(rx_body, 1265 as_bytes('TestBasicNackMultiple2')) 1266 1267 # Nack both messages via the "multiple" option 1268 ch.basic_nack(rx_method.delivery_tag, multiple=True, requeue=True) 1269 1270 # Verify that both messages are present in the queue 1271 self._assert_exact_message_count_with_retries(channel=ch, 1272 queue=q_name, 1273 expected_count=2) 1274 (rx_method, _, rx_body) = ch.basic_get(q_name, auto_ack=False) 1275 self.assertEqual(rx_body, 1276 as_bytes('TestBasicNackMultiple1')) 1277 (rx_method, _, rx_body) = ch.basic_get(q_name, auto_ack=False) 1278 self.assertEqual(rx_body, 1279 as_bytes('TestBasicNackMultiple2')) 1280 1281 1282class TestBasicRecoverWithRequeue(BlockingTestCaseBase): 1283 1284 def test(self): 1285 """BlockingChannel.basic_recover with requeue=True. 1286 1287 NOTE: the requeue=False option is not supported by RabbitMQ broker as 1288 of this writing (using RabbitMQ 3.5.1) 1289 """ 1290 connection = self._connect() 1291 1292 ch = connection.channel() 1293 1294 q_name = ( 1295 'TestBasicRecoverWithRequeue_q' + uuid.uuid1().hex) 1296 1297 # Place channel in publisher-acknowledgments mode so that the message 1298 # may be delivered synchronously to the queue by publishing it with 1299 # mandatory=True 1300 ch.confirm_delivery() 1301 1302 # Declare a new queue 1303 ch.queue_declare(q_name, auto_delete=True) 1304 self.addCleanup(lambda: self._connect().channel().queue_delete(q_name)) 1305 1306 # Deposit two messages in the queue via default exchange 1307 ch.basic_publish(exchange='', routing_key=q_name, 1308 body='TestBasicRecoverWithRequeue1', mandatory=True) 1309 ch.basic_publish(exchange='', routing_key=q_name, 1310 body='TestBasicRecoverWithRequeue2', mandatory=True) 1311 1312 rx_messages = [] 1313 num_messages = 0 1314 for msg in ch.consume(q_name, auto_ack=False): 1315 num_messages += 1 1316 1317 if num_messages == 2: 1318 ch.basic_recover(requeue=True) 1319 1320 if num_messages > 2: 1321 rx_messages.append(msg) 1322 1323 if num_messages == 4: 1324 break 1325 else: 1326 self.fail('consumer aborted prematurely') 1327 1328 # Get the messages 1329 (_, _, rx_body) = rx_messages[0] 1330 self.assertEqual(rx_body, 1331 as_bytes('TestBasicRecoverWithRequeue1')) 1332 1333 (_, _, rx_body) = rx_messages[1] 1334 self.assertEqual(rx_body, 1335 as_bytes('TestBasicRecoverWithRequeue2')) 1336 1337 1338class TestTxCommit(BlockingTestCaseBase): 1339 1340 def test(self): 1341 """BlockingChannel.tx_commit""" 1342 connection = self._connect() 1343 1344 ch = connection.channel() 1345 1346 q_name = 'TestTxCommit_q' + uuid.uuid1().hex 1347 1348 # Declare a new queue 1349 ch.queue_declare(q_name, auto_delete=True) 1350 self.addCleanup(lambda: self._connect().channel().queue_delete(q_name)) 1351 1352 # Select standard transaction mode 1353 frame = ch.tx_select() 1354 self.assertIsInstance(frame.method, pika.spec.Tx.SelectOk) 1355 1356 # Deposit a message in the queue via default exchange 1357 ch.basic_publish(exchange='', routing_key=q_name, 1358 body='TestTxCommit1', mandatory=True) 1359 1360 # Verify that queue is still empty 1361 frame = ch.queue_declare(q_name, passive=True) 1362 self.assertEqual(frame.method.message_count, 0) 1363 1364 # Commit the transaction 1365 ch.tx_commit() 1366 1367 # Verify that the queue has the expected message 1368 frame = ch.queue_declare(q_name, passive=True) 1369 self.assertEqual(frame.method.message_count, 1) 1370 1371 (_, _, rx_body) = ch.basic_get(q_name, auto_ack=False) 1372 self.assertEqual(rx_body, as_bytes('TestTxCommit1')) 1373 1374 1375class TestTxRollback(BlockingTestCaseBase): 1376 1377 def test(self): 1378 """BlockingChannel.tx_commit""" 1379 connection = self._connect() 1380 1381 ch = connection.channel() 1382 1383 q_name = 'TestTxRollback_q' + uuid.uuid1().hex 1384 1385 # Declare a new queue 1386 ch.queue_declare(q_name, auto_delete=True) 1387 self.addCleanup(lambda: self._connect().channel().queue_delete(q_name)) 1388 1389 # Select standard transaction mode 1390 frame = ch.tx_select() 1391 self.assertIsInstance(frame.method, pika.spec.Tx.SelectOk) 1392 1393 # Deposit a message in the queue via default exchange 1394 ch.basic_publish(exchange='', routing_key=q_name, 1395 body='TestTxRollback1', mandatory=True) 1396 1397 # Verify that queue is still empty 1398 frame = ch.queue_declare(q_name, passive=True) 1399 self.assertEqual(frame.method.message_count, 0) 1400 1401 # Roll back the transaction 1402 ch.tx_rollback() 1403 1404 # Verify that the queue continues to be empty 1405 frame = ch.queue_declare(q_name, passive=True) 1406 self.assertEqual(frame.method.message_count, 0) 1407 1408 1409class TestBasicConsumeFromUnknownQueueRaisesChannelClosed(BlockingTestCaseBase): 1410 def test(self): 1411 """ChannelClosed raised when consuming from unknown queue""" 1412 connection = self._connect() 1413 ch = connection.channel() 1414 1415 q_name = ("TestBasicConsumeFromUnknownQueueRaisesChannelClosed_q_" + 1416 uuid.uuid1().hex) 1417 1418 with self.assertRaises(pika.exceptions.ChannelClosedByBroker) as ex_cm: 1419 ch.basic_consume(q_name, lambda *args: None) 1420 1421 self.assertEqual(ex_cm.exception.args[0], 404) 1422 1423 1424class TestPublishAndBasicPublishWithPubacksUnroutable(BlockingTestCaseBase): 1425 1426 def test(self): # pylint: disable=R0914 1427 """BlockingChannel.publish amd basic_publish unroutable message with pubacks""" # pylint: disable=C0301 1428 connection = self._connect() 1429 1430 ch = connection.channel() 1431 1432 exg_name = ('TestPublishAndBasicPublishUnroutable_exg_' + 1433 uuid.uuid1().hex) 1434 routing_key = 'TestPublishAndBasicPublishUnroutable' 1435 1436 # Place channel in publisher-acknowledgments mode so that publishing 1437 # with mandatory=True will be synchronous 1438 res = ch.confirm_delivery() 1439 self.assertIsNone(res) 1440 1441 # Declare a new exchange 1442 ch.exchange_declare(exg_name, exchange_type='direct') 1443 self.addCleanup(connection.channel().exchange_delete, exg_name) 1444 1445 # Verify unroutable message handling using basic_publish 1446 msg2_headers = dict( 1447 test_name='TestPublishAndBasicPublishWithPubacksUnroutable') 1448 msg2_properties = pika.spec.BasicProperties(headers=msg2_headers) 1449 with self.assertRaises(pika.exceptions.UnroutableError) as cm: 1450 ch.basic_publish(exg_name, routing_key=routing_key, body='', 1451 properties=msg2_properties, mandatory=True) 1452 (msg,) = cm.exception.messages 1453 self.assertIsInstance(msg, blocking_connection.ReturnedMessage) 1454 self.assertIsInstance(msg.method, pika.spec.Basic.Return) 1455 self.assertEqual(msg.method.reply_code, 312) 1456 self.assertEqual(msg.method.exchange, exg_name) 1457 self.assertEqual(msg.method.routing_key, routing_key) 1458 self.assertIsInstance(msg.properties, pika.BasicProperties) 1459 self.assertEqual(msg.properties.headers, msg2_headers) 1460 self.assertEqual(msg.body, as_bytes('')) 1461 1462 1463class TestConfirmDeliveryAfterUnroutableMessage(BlockingTestCaseBase): 1464 1465 def test(self): # pylint: disable=R0914 1466 """BlockingChannel.confirm_delivery following unroutable message""" 1467 connection = self._connect() 1468 1469 ch = connection.channel() 1470 1471 exg_name = ('TestConfirmDeliveryAfterUnroutableMessage_exg_' + 1472 uuid.uuid1().hex) 1473 routing_key = 'TestConfirmDeliveryAfterUnroutableMessage' 1474 1475 # Declare a new exchange 1476 ch.exchange_declare(exg_name, exchange_type='direct') 1477 self.addCleanup(connection.channel().exchange_delete, exg_name) 1478 1479 # Register on-return callback 1480 returned_messages = [] 1481 ch.add_on_return_callback(lambda *args: returned_messages.append(args)) 1482 1483 # Emit unroutable message without pubacks 1484 ch.basic_publish(exg_name, routing_key=routing_key, body='', mandatory=True) 1485 1486 # Select delivery confirmations 1487 ch.confirm_delivery() 1488 1489 # Verify that unroutable message is in pending events 1490 self.assertEqual(len(ch._pending_events), 1) 1491 self.assertIsInstance(ch._pending_events[0], 1492 blocking_connection._ReturnedMessageEvt) 1493 # Verify that repr of _ReturnedMessageEvt instance does crash 1494 repr(ch._pending_events[0]) 1495 1496 # Dispach events 1497 connection.process_data_events() 1498 1499 self.assertEqual(len(ch._pending_events), 0) 1500 1501 # Verify that unroutable message was dispatched 1502 ((channel, method, properties, body,),) = returned_messages # pylint: disable=E0632 1503 self.assertIs(channel, ch) 1504 self.assertIsInstance(method, pika.spec.Basic.Return) 1505 self.assertEqual(method.reply_code, 312) 1506 self.assertEqual(method.exchange, exg_name) 1507 self.assertEqual(method.routing_key, routing_key) 1508 self.assertIsInstance(properties, pika.BasicProperties) 1509 self.assertEqual(body, as_bytes('')) 1510 1511 1512class TestUnroutableMessagesReturnedInNonPubackMode(BlockingTestCaseBase): 1513 1514 def test(self): # pylint: disable=R0914 1515 """BlockingChannel: unroutable messages is returned in non-puback mode""" # pylint: disable=C0301 1516 connection = self._connect() 1517 1518 ch = connection.channel() 1519 1520 exg_name = ( 1521 'TestUnroutableMessageReturnedInNonPubackMode_exg_' 1522 + uuid.uuid1().hex) 1523 routing_key = 'TestUnroutableMessageReturnedInNonPubackMode' 1524 1525 # Declare a new exchange 1526 ch.exchange_declare(exg_name, exchange_type='direct') 1527 self.addCleanup(connection.channel().exchange_delete, exg_name) 1528 1529 # Register on-return callback 1530 returned_messages = [] 1531 ch.add_on_return_callback( 1532 lambda *args: returned_messages.append(args)) 1533 1534 # Emit unroutable messages without pubacks 1535 ch.basic_publish(exg_name, routing_key=routing_key, body='msg1', mandatory=True) 1536 ch.basic_publish(exg_name, routing_key=routing_key, body='msg2', mandatory=True) 1537 1538 # Process I/O until Basic.Return are dispatched 1539 while len(returned_messages) < 2: 1540 connection.process_data_events() 1541 1542 self.assertEqual(len(returned_messages), 2) 1543 1544 self.assertEqual(len(ch._pending_events), 0) 1545 1546 # Verify returned messages 1547 (channel, method, properties, body,) = returned_messages[0] 1548 self.assertIs(channel, ch) 1549 self.assertIsInstance(method, pika.spec.Basic.Return) 1550 self.assertEqual(method.reply_code, 312) 1551 self.assertEqual(method.exchange, exg_name) 1552 self.assertEqual(method.routing_key, routing_key) 1553 self.assertIsInstance(properties, pika.BasicProperties) 1554 self.assertEqual(body, as_bytes('msg1')) 1555 1556 (channel, method, properties, body,) = returned_messages[1] 1557 self.assertIs(channel, ch) 1558 self.assertIsInstance(method, pika.spec.Basic.Return) 1559 self.assertEqual(method.reply_code, 312) 1560 self.assertEqual(method.exchange, exg_name) 1561 self.assertEqual(method.routing_key, routing_key) 1562 self.assertIsInstance(properties, pika.BasicProperties) 1563 self.assertEqual(body, as_bytes('msg2')) 1564 1565 1566class TestUnroutableMessageReturnedInPubackMode(BlockingTestCaseBase): 1567 1568 def test(self): # pylint: disable=R0914 1569 """BlockingChannel: unroutable messages is returned in puback mode""" 1570 connection = self._connect() 1571 1572 ch = connection.channel() 1573 1574 exg_name = ( 1575 'TestUnroutableMessageReturnedInPubackMode_exg_' 1576 + uuid.uuid1().hex) 1577 routing_key = 'TestUnroutableMessageReturnedInPubackMode' 1578 1579 # Declare a new exchange 1580 ch.exchange_declare(exg_name, exchange_type='direct') 1581 self.addCleanup(connection.channel().exchange_delete, exg_name) 1582 1583 # Select delivery confirmations 1584 ch.confirm_delivery() 1585 1586 # Register on-return callback 1587 returned_messages = [] 1588 ch.add_on_return_callback( 1589 lambda *args: returned_messages.append(args)) 1590 1591 # Emit unroutable messages with pubacks 1592 with self.assertRaises(pika.exceptions.UnroutableError): 1593 ch.basic_publish(exg_name, routing_key=routing_key, body='msg1', 1594 mandatory=True) 1595 with self.assertRaises(pika.exceptions.UnroutableError): 1596 ch.basic_publish(exg_name, routing_key=routing_key, body='msg2', 1597 mandatory=True) 1598 1599 # Verify that unroutable messages are already in pending events 1600 self.assertEqual(len(ch._pending_events), 2) 1601 self.assertIsInstance(ch._pending_events[0], 1602 blocking_connection._ReturnedMessageEvt) 1603 self.assertIsInstance(ch._pending_events[1], 1604 blocking_connection._ReturnedMessageEvt) 1605 # Verify that repr of _ReturnedMessageEvt instance does crash 1606 repr(ch._pending_events[0]) 1607 repr(ch._pending_events[1]) 1608 1609 # Dispatch events 1610 connection.process_data_events() 1611 1612 self.assertEqual(len(ch._pending_events), 0) 1613 1614 # Verify returned messages 1615 (channel, method, properties, body,) = returned_messages[0] 1616 self.assertIs(channel, ch) 1617 self.assertIsInstance(method, pika.spec.Basic.Return) 1618 self.assertEqual(method.reply_code, 312) 1619 self.assertEqual(method.exchange, exg_name) 1620 self.assertEqual(method.routing_key, routing_key) 1621 self.assertIsInstance(properties, pika.BasicProperties) 1622 self.assertEqual(body, as_bytes('msg1')) 1623 1624 (channel, method, properties, body,) = returned_messages[1] 1625 self.assertIs(channel, ch) 1626 self.assertIsInstance(method, pika.spec.Basic.Return) 1627 self.assertEqual(method.reply_code, 312) 1628 self.assertEqual(method.exchange, exg_name) 1629 self.assertEqual(method.routing_key, routing_key) 1630 self.assertIsInstance(properties, pika.BasicProperties) 1631 self.assertEqual(body, as_bytes('msg2')) 1632 1633 1634class TestBasicPublishDeliveredWhenPendingUnroutable(BlockingTestCaseBase): 1635 1636 def test(self): # pylint: disable=R0914 1637 """BlockingChannel.basic_publish msg delivered despite pending unroutable message""" # pylint: disable=C0301 1638 connection = self._connect() 1639 1640 ch = connection.channel() 1641 1642 q_name = ('TestBasicPublishDeliveredWhenPendingUnroutable_q' + 1643 uuid.uuid1().hex) 1644 exg_name = ('TestBasicPublishDeliveredWhenPendingUnroutable_exg_' + 1645 uuid.uuid1().hex) 1646 routing_key = 'TestBasicPublishDeliveredWhenPendingUnroutable' 1647 1648 # Declare a new exchange 1649 ch.exchange_declare(exg_name, exchange_type='direct') 1650 self.addCleanup(connection.channel().exchange_delete, exg_name) 1651 1652 # Declare a new queue 1653 ch.queue_declare(q_name, auto_delete=True) 1654 self.addCleanup(lambda: self._connect().channel().queue_delete(q_name)) 1655 1656 # Bind the queue to the exchange using routing key 1657 ch.queue_bind(q_name, exchange=exg_name, routing_key=routing_key) 1658 1659 # Attempt to send an unroutable message in the queue via basic_publish 1660 ch.basic_publish(exg_name, routing_key='', 1661 body='unroutable-message', 1662 mandatory=True) 1663 1664 # Flush connection to force Basic.Return 1665 connection.channel().close() 1666 1667 # Deposit a routable message in the queue 1668 ch.basic_publish(exg_name, routing_key=routing_key, 1669 body='routable-message', 1670 mandatory=True) 1671 1672 # Wait for the queue to get the routable message 1673 self._assert_exact_message_count_with_retries(channel=ch, 1674 queue=q_name, 1675 expected_count=1) 1676 1677 msg = ch.basic_get(q_name) 1678 1679 # Check the first message 1680 self.assertIsInstance(msg, tuple) 1681 rx_method, rx_properties, rx_body = msg 1682 self.assertIsInstance(rx_method, pika.spec.Basic.GetOk) 1683 self.assertEqual(rx_method.delivery_tag, 1) 1684 self.assertFalse(rx_method.redelivered) 1685 self.assertEqual(rx_method.exchange, exg_name) 1686 self.assertEqual(rx_method.routing_key, routing_key) 1687 1688 self.assertIsInstance(rx_properties, pika.BasicProperties) 1689 self.assertEqual(rx_body, as_bytes('routable-message')) 1690 1691 # There shouldn't be any more events now 1692 self.assertFalse(ch._pending_events) 1693 1694 # Ack the message 1695 ch.basic_ack(delivery_tag=rx_method.delivery_tag, multiple=False) 1696 1697 # Verify that the queue is now empty 1698 self._assert_exact_message_count_with_retries(channel=ch, 1699 queue=q_name, 1700 expected_count=0) 1701 1702 1703class TestPublishAndConsumeWithPubacksAndQosOfOne(BlockingTestCaseBase): 1704 1705 def test(self): # pylint: disable=R0914,R0915 1706 """BlockingChannel.basic_publish, publish, basic_consume, QoS, \ 1707 Basic.Cancel from broker 1708 """ 1709 connection = self._connect() 1710 1711 ch = connection.channel() 1712 1713 q_name = 'TestPublishAndConsumeAndQos_q' + uuid.uuid1().hex 1714 exg_name = 'TestPublishAndConsumeAndQos_exg_' + uuid.uuid1().hex 1715 routing_key = 'TestPublishAndConsumeAndQos' 1716 1717 # Place channel in publisher-acknowledgments mode so that publishing 1718 # with mandatory=True will be synchronous 1719 res = ch.confirm_delivery() 1720 self.assertIsNone(res) 1721 1722 # Declare a new exchange 1723 ch.exchange_declare(exg_name, exchange_type='direct') 1724 self.addCleanup(connection.channel().exchange_delete, exg_name) 1725 1726 # Declare a new queue 1727 ch.queue_declare(q_name, auto_delete=True) 1728 self.addCleanup(lambda: self._connect().channel().queue_delete(q_name)) 1729 1730 # Bind the queue to the exchange using routing key 1731 ch.queue_bind(q_name, exchange=exg_name, routing_key=routing_key) 1732 1733 # Deposit a message in the queue 1734 msg1_headers = dict( 1735 test_name='TestPublishAndConsumeWithPubacksAndQosOfOne') 1736 msg1_properties = pika.spec.BasicProperties(headers=msg1_headers) 1737 ch.basic_publish(exg_name, routing_key=routing_key, 1738 body='via-basic_publish', 1739 properties=msg1_properties, 1740 mandatory=True) 1741 1742 # Deposit another message in the queue 1743 ch.basic_publish(exg_name, routing_key, body='via-publish', 1744 mandatory=True) 1745 1746 # Check that the queue now has two messages 1747 frame = ch.queue_declare(q_name, passive=True) 1748 self.assertEqual(frame.method.message_count, 2) 1749 1750 # Configure QoS for one message 1751 ch.basic_qos(prefetch_size=0, prefetch_count=1, global_qos=False) 1752 1753 # Create a consumer 1754 rx_messages = [] 1755 consumer_tag = ch.basic_consume( 1756 q_name, 1757 lambda *args: rx_messages.append(args), 1758 auto_ack=False, 1759 exclusive=False, 1760 arguments=None) 1761 1762 # Wait for first message to arrive 1763 while not rx_messages: 1764 connection.process_data_events(time_limit=None) 1765 1766 self.assertEqual(len(rx_messages), 1) 1767 1768 # Check the first message 1769 msg = rx_messages[0] 1770 self.assertIsInstance(msg, tuple) 1771 rx_ch, rx_method, rx_properties, rx_body = msg 1772 self.assertIs(rx_ch, ch) 1773 self.assertIsInstance(rx_method, pika.spec.Basic.Deliver) 1774 self.assertEqual(rx_method.consumer_tag, consumer_tag) 1775 self.assertEqual(rx_method.delivery_tag, 1) 1776 self.assertFalse(rx_method.redelivered) 1777 self.assertEqual(rx_method.exchange, exg_name) 1778 self.assertEqual(rx_method.routing_key, routing_key) 1779 1780 self.assertIsInstance(rx_properties, pika.BasicProperties) 1781 self.assertEqual(rx_properties.headers, msg1_headers) 1782 self.assertEqual(rx_body, as_bytes('via-basic_publish')) 1783 1784 # There shouldn't be any more events now 1785 self.assertFalse(ch._pending_events) 1786 1787 # Ack the message so that the next one can arrive (we configured QoS 1788 # with prefetch_count=1) 1789 ch.basic_ack(delivery_tag=rx_method.delivery_tag, multiple=False) 1790 1791 # Get the second message 1792 while len(rx_messages) < 2: 1793 connection.process_data_events(time_limit=None) 1794 1795 self.assertEqual(len(rx_messages), 2) 1796 1797 msg = rx_messages[1] 1798 self.assertIsInstance(msg, tuple) 1799 rx_ch, rx_method, rx_properties, rx_body = msg 1800 self.assertIs(rx_ch, ch) 1801 self.assertIsInstance(rx_method, pika.spec.Basic.Deliver) 1802 self.assertEqual(rx_method.consumer_tag, consumer_tag) 1803 self.assertEqual(rx_method.delivery_tag, 2) 1804 self.assertFalse(rx_method.redelivered) 1805 self.assertEqual(rx_method.exchange, exg_name) 1806 self.assertEqual(rx_method.routing_key, routing_key) 1807 1808 self.assertIsInstance(rx_properties, pika.BasicProperties) 1809 self.assertEqual(rx_body, as_bytes('via-publish')) 1810 1811 # There shouldn't be any more events now 1812 self.assertFalse(ch._pending_events) 1813 1814 ch.basic_ack(delivery_tag=rx_method.delivery_tag, multiple=False) 1815 1816 # Verify that the queue is now empty 1817 self._assert_exact_message_count_with_retries(channel=ch, 1818 queue=q_name, 1819 expected_count=0) 1820 1821 # Attempt to consume again with a short timeout 1822 connection.process_data_events(time_limit=0.005) 1823 self.assertEqual(len(rx_messages), 2) 1824 1825 # Delete the queue and wait for consumer cancellation 1826 rx_cancellations = [] 1827 ch.add_on_cancel_callback(rx_cancellations.append) 1828 ch.queue_delete(q_name) 1829 ch.start_consuming() 1830 1831 self.assertEqual(len(rx_cancellations), 1) 1832 frame, = rx_cancellations # pylint: disable=E0632 1833 self.assertEqual(frame.method.consumer_tag, consumer_tag) 1834 1835 1836class TestBasicConsumeWithAckFromAnotherThread(BlockingTestCaseBase): 1837 1838 def test(self): # pylint: disable=R0914,R0915 1839 """BlockingChannel.basic_consume with ack from another thread and \ 1840 requesting basic_ack via add_callback_threadsafe 1841 """ 1842 # This test simulates processing of a message on another thread and 1843 # then requesting an ACK to be dispatched on the connection's thread 1844 # via BlockingConnection.add_callback_threadsafe 1845 1846 connection = self._connect() 1847 1848 ch = connection.channel() 1849 1850 q_name = 'TestBasicConsumeWithAckFromAnotherThread_q' + uuid.uuid1().hex 1851 exg_name = ('TestBasicConsumeWithAckFromAnotherThread_exg' + 1852 uuid.uuid1().hex) 1853 routing_key = 'TestBasicConsumeWithAckFromAnotherThread' 1854 1855 # Place channel in publisher-acknowledgments mode so that publishing 1856 # with mandatory=True will be synchronous (for convenience) 1857 res = ch.confirm_delivery() 1858 self.assertIsNone(res) 1859 1860 # Declare a new exchange 1861 ch.exchange_declare(exg_name, exchange_type='direct') 1862 self.addCleanup(connection.channel().exchange_delete, exg_name) 1863 1864 # Declare a new queue 1865 ch.queue_declare(q_name, auto_delete=True) 1866 self.addCleanup(lambda: self._connect().channel().queue_delete(q_name)) 1867 1868 # Bind the queue to the exchange using routing key 1869 ch.queue_bind(q_name, exchange=exg_name, routing_key=routing_key) 1870 1871 # Publish 2 messages with mandatory=True for synchronous processing 1872 ch.basic_publish(exg_name, routing_key, body='msg1', mandatory=True) 1873 ch.basic_publish(exg_name, routing_key, body='last-msg', mandatory=True) 1874 1875 # Configure QoS for one message so that the 2nd message will be 1876 # delivered only after the 1st one is ACKed 1877 ch.basic_qos(prefetch_size=0, prefetch_count=1, global_qos=False) 1878 1879 # Create a consumer 1880 rx_messages = [] 1881 def ackAndEnqueueMessageViaAnotherThread(rx_ch, 1882 rx_method, 1883 rx_properties, # pylint: disable=W0613 1884 rx_body): 1885 LOGGER.debug( 1886 '%s: Got message body=%r; delivery-tag=%r', 1887 datetime.now(), rx_body, rx_method.delivery_tag) 1888 1889 # Request ACK dispatch via add_callback_threadsafe from other 1890 # thread; if last message, cancel consumer so that start_consuming 1891 # can return 1892 1893 def processOnConnectionThread(): 1894 LOGGER.debug('%s: ACKing message body=%r; delivery-tag=%r', 1895 datetime.now(), 1896 rx_body, 1897 rx_method.delivery_tag) 1898 ch.basic_ack(delivery_tag=rx_method.delivery_tag, 1899 multiple=False) 1900 rx_messages.append(rx_body) 1901 1902 # NOTE on python3, `b'last-msg' != 'last-msg'` 1903 if rx_body == b'last-msg': 1904 LOGGER.debug('%s: Canceling consumer consumer-tag=%r', 1905 datetime.now(), 1906 rx_method.consumer_tag) 1907 rx_ch.basic_cancel(rx_method.consumer_tag) 1908 1909 # Spawn a thread to initiate ACKing 1910 timer = threading.Timer(0, 1911 lambda: connection.add_callback_threadsafe( 1912 processOnConnectionThread)) 1913 self.addCleanup(timer.cancel) 1914 timer.start() 1915 1916 consumer_tag = ch.basic_consume( 1917 q_name, 1918 ackAndEnqueueMessageViaAnotherThread, 1919 auto_ack=False, 1920 exclusive=False, 1921 arguments=None) 1922 1923 # Wait for both messages 1924 LOGGER.debug('%s: calling start_consuming(); consumer tag=%r', 1925 datetime.now(), 1926 consumer_tag) 1927 ch.start_consuming() 1928 LOGGER.debug('%s: Returned from start_consuming(); consumer tag=%r', 1929 datetime.now(), 1930 consumer_tag) 1931 1932 self.assertEqual(len(rx_messages), 2) 1933 self.assertEqual(rx_messages[0], b'msg1') 1934 self.assertEqual(rx_messages[1], b'last-msg') 1935 1936 1937class TestConsumeGeneratorWithAckFromAnotherThread(BlockingTestCaseBase): 1938 1939 def test(self): # pylint: disable=R0914,R0915 1940 """BlockingChannel.consume and requesting basic_ack from another \ 1941 thread via add_callback_threadsafe 1942 """ 1943 connection = self._connect() 1944 1945 ch = connection.channel() 1946 1947 q_name = ('TestConsumeGeneratorWithAckFromAnotherThread_q' + 1948 uuid.uuid1().hex) 1949 exg_name = ('TestConsumeGeneratorWithAckFromAnotherThread_exg' + 1950 uuid.uuid1().hex) 1951 routing_key = 'TestConsumeGeneratorWithAckFromAnotherThread' 1952 1953 # Place channel in publisher-acknowledgments mode so that publishing 1954 # with mandatory=True will be synchronous (for convenience) 1955 res = ch.confirm_delivery() 1956 self.assertIsNone(res) 1957 1958 # Declare a new exchange 1959 ch.exchange_declare(exg_name, exchange_type='direct') 1960 self.addCleanup(connection.channel().exchange_delete, exg_name) 1961 1962 # Declare a new queue 1963 ch.queue_declare(q_name, auto_delete=True) 1964 self.addCleanup(lambda: self._connect().channel().queue_delete(q_name)) 1965 1966 # Bind the queue to the exchange using routing key 1967 ch.queue_bind(q_name, exchange=exg_name, routing_key=routing_key) 1968 1969 # Publish 2 messages with mandatory=True for synchronous processing 1970 ch.basic_publish(exg_name, routing_key, body='msg1', mandatory=True) 1971 ch.basic_publish(exg_name, routing_key, body='last-msg', mandatory=True) 1972 1973 # Configure QoS for one message so that the 2nd message will be 1974 # delivered only after the 1st one is ACKed 1975 ch.basic_qos(prefetch_size=0, prefetch_count=1, global_qos=False) 1976 1977 # Create a consumer 1978 rx_messages = [] 1979 def ackAndEnqueueMessageViaAnotherThread(rx_ch, 1980 rx_method, 1981 rx_properties, # pylint: disable=W0613 1982 rx_body): 1983 LOGGER.debug( 1984 '%s: Got message body=%r; delivery-tag=%r', 1985 datetime.now(), rx_body, rx_method.delivery_tag) 1986 1987 # Request ACK dispatch via add_callback_threadsafe from other 1988 # thread; if last message, cancel consumer so that consumer 1989 # generator completes 1990 1991 def processOnConnectionThread(): 1992 LOGGER.debug('%s: ACKing message body=%r; delivery-tag=%r', 1993 datetime.now(), 1994 rx_body, 1995 rx_method.delivery_tag) 1996 ch.basic_ack(delivery_tag=rx_method.delivery_tag, 1997 multiple=False) 1998 rx_messages.append(rx_body) 1999 2000 # NOTE on python3, `b'last-msg' != 'last-msg'` 2001 if rx_body == b'last-msg': 2002 LOGGER.debug('%s: Canceling consumer consumer-tag=%r', 2003 datetime.now(), 2004 rx_method.consumer_tag) 2005 # NOTE Need to use cancel() for the consumer generator 2006 # instead of basic_cancel() 2007 rx_ch.cancel() 2008 2009 # Spawn a thread to initiate ACKing 2010 timer = threading.Timer(0, 2011 lambda: connection.add_callback_threadsafe( 2012 processOnConnectionThread)) 2013 self.addCleanup(timer.cancel) 2014 timer.start() 2015 2016 for method, properties, body in ch.consume(q_name, auto_ack=False): 2017 ackAndEnqueueMessageViaAnotherThread(rx_ch=ch, 2018 rx_method=method, 2019 rx_properties=properties, 2020 rx_body=body) 2021 2022 self.assertEqual(len(rx_messages), 2) 2023 self.assertEqual(rx_messages[0], b'msg1') 2024 self.assertEqual(rx_messages[1], b'last-msg') 2025 2026 2027class TestTwoBasicConsumersOnSameChannel(BlockingTestCaseBase): 2028 2029 def test(self): # pylint: disable=R0914 2030 """BlockingChannel: two basic_consume consumers on same channel 2031 """ 2032 connection = self._connect() 2033 2034 ch = connection.channel() 2035 2036 exg_name = 'TestPublishAndConsumeAndQos_exg_' + uuid.uuid1().hex 2037 q1_name = 'TestTwoBasicConsumersOnSameChannel_q1' + uuid.uuid1().hex 2038 q2_name = 'TestTwoBasicConsumersOnSameChannel_q2' + uuid.uuid1().hex 2039 q1_routing_key = 'TestTwoBasicConsumersOnSameChannel1' 2040 q2_routing_key = 'TestTwoBasicConsumersOnSameChannel2' 2041 2042 # Place channel in publisher-acknowledgments mode so that publishing 2043 # with mandatory=True will be synchronous 2044 ch.confirm_delivery() 2045 2046 # Declare a new exchange 2047 ch.exchange_declare(exg_name, exchange_type='direct') 2048 self.addCleanup(connection.channel().exchange_delete, exg_name) 2049 2050 # Declare the two new queues and bind them to the exchange 2051 ch.queue_declare(q1_name, auto_delete=True) 2052 self.addCleanup(lambda: self._connect().channel().queue_delete(q1_name)) 2053 ch.queue_bind(q1_name, exchange=exg_name, routing_key=q1_routing_key) 2054 2055 ch.queue_declare(q2_name, auto_delete=True) 2056 self.addCleanup(lambda: self._connect().channel().queue_delete(q2_name)) 2057 ch.queue_bind(q2_name, exchange=exg_name, routing_key=q2_routing_key) 2058 2059 # Deposit messages in the queues 2060 q1_tx_message_bodies = ['q1_message+%s' % (i,) 2061 for i in pika.compat.xrange(100)] 2062 for message_body in q1_tx_message_bodies: 2063 ch.basic_publish(exg_name, q1_routing_key, body=message_body, mandatory=True) 2064 2065 q2_tx_message_bodies = ['q2_message+%s' % (i,) 2066 for i in pika.compat.xrange(150)] 2067 for message_body in q2_tx_message_bodies: 2068 ch.basic_publish(exg_name, q2_routing_key, body=message_body, mandatory=True) 2069 2070 # Create the consumers 2071 q1_rx_messages = [] 2072 q1_consumer_tag = ch.basic_consume( 2073 q1_name, 2074 lambda *args: q1_rx_messages.append(args), 2075 auto_ack=False, 2076 exclusive=False, 2077 arguments=None) 2078 2079 q2_rx_messages = [] 2080 q2_consumer_tag = ch.basic_consume( 2081 q2_name, 2082 lambda *args: q2_rx_messages.append(args), 2083 auto_ack=False, 2084 exclusive=False, 2085 arguments=None) 2086 2087 # Wait for all messages to be delivered 2088 while (len(q1_rx_messages) < len(q1_tx_message_bodies) or 2089 len(q2_rx_messages) < len(q2_tx_message_bodies)): 2090 connection.process_data_events(time_limit=None) 2091 2092 self.assertEqual(len(q2_rx_messages), len(q2_tx_message_bodies)) 2093 2094 # Verify the messages 2095 def validate_messages(rx_messages, 2096 routing_key, 2097 consumer_tag, 2098 tx_message_bodies): 2099 self.assertEqual(len(rx_messages), len(tx_message_bodies)) 2100 2101 for msg, expected_body in zip(rx_messages, tx_message_bodies): 2102 self.assertIsInstance(msg, tuple) 2103 rx_ch, rx_method, rx_properties, rx_body = msg 2104 self.assertIs(rx_ch, ch) 2105 self.assertIsInstance(rx_method, pika.spec.Basic.Deliver) 2106 self.assertEqual(rx_method.consumer_tag, consumer_tag) 2107 self.assertFalse(rx_method.redelivered) 2108 self.assertEqual(rx_method.exchange, exg_name) 2109 self.assertEqual(rx_method.routing_key, routing_key) 2110 2111 self.assertIsInstance(rx_properties, pika.BasicProperties) 2112 self.assertEqual(rx_body, as_bytes(expected_body)) 2113 2114 # Validate q1 consumed messages 2115 validate_messages(rx_messages=q1_rx_messages, 2116 routing_key=q1_routing_key, 2117 consumer_tag=q1_consumer_tag, 2118 tx_message_bodies=q1_tx_message_bodies) 2119 2120 # Validate q2 consumed messages 2121 validate_messages(rx_messages=q2_rx_messages, 2122 routing_key=q2_routing_key, 2123 consumer_tag=q2_consumer_tag, 2124 tx_message_bodies=q2_tx_message_bodies) 2125 2126 # There shouldn't be any more events now 2127 self.assertFalse(ch._pending_events) 2128 2129 2130class TestBasicCancelPurgesPendingConsumerCancellationEvt(BlockingTestCaseBase): 2131 2132 def test(self): 2133 """BlockingChannel.basic_cancel purges pending _ConsumerCancellationEvt""" # pylint: disable=C0301 2134 connection = self._connect() 2135 2136 ch = connection.channel() 2137 2138 q_name = ('TestBasicCancelPurgesPendingConsumerCancellationEvt_q' + 2139 uuid.uuid1().hex) 2140 2141 ch.queue_declare(q_name) 2142 2143 self.addCleanup(lambda: self._connect().channel().queue_delete(q_name)) 2144 2145 ch.basic_publish('', routing_key=q_name, body='via-publish', mandatory=True) 2146 2147 # Create a consumer. Not passing a 'callback' to test client-generated 2148 # consumer tags 2149 rx_messages = [] 2150 consumer_tag = ch.basic_consume( 2151 q_name, 2152 lambda *args: rx_messages.append(args)) 2153 2154 # Wait for the published message to arrive, but don't consume it 2155 while not ch._pending_events: 2156 # Issue synchronous command that forces processing of incoming I/O 2157 connection.channel().close() 2158 2159 self.assertEqual(len(ch._pending_events), 1) 2160 self.assertIsInstance(ch._pending_events[0], 2161 blocking_connection._ConsumerDeliveryEvt) 2162 2163 # Delete the queue and wait for broker-initiated consumer cancellation 2164 ch.queue_delete(q_name) 2165 while len(ch._pending_events) < 2: 2166 # Issue synchronous command that forces processing of incoming I/O 2167 connection.channel().close() 2168 2169 self.assertEqual(len(ch._pending_events), 2) 2170 self.assertIsInstance(ch._pending_events[1], 2171 blocking_connection._ConsumerCancellationEvt) 2172 2173 # Issue consumer cancellation and verify that the pending 2174 # _ConsumerCancellationEvt instance was removed 2175 messages = ch.basic_cancel(consumer_tag) 2176 self.assertEqual(messages, []) 2177 2178 self.assertEqual(len(ch._pending_events), 0) 2179 2180 2181class TestBasicPublishWithoutPubacks(BlockingTestCaseBase): 2182 2183 def test(self): # pylint: disable=R0914,R0915 2184 """BlockingChannel.basic_publish without pubacks""" 2185 connection = self._connect() 2186 2187 ch = connection.channel() 2188 2189 q_name = 'TestBasicPublishWithoutPubacks_q' + uuid.uuid1().hex 2190 exg_name = 'TestBasicPublishWithoutPubacks_exg_' + uuid.uuid1().hex 2191 routing_key = 'TestBasicPublishWithoutPubacks' 2192 2193 # Declare a new exchange 2194 ch.exchange_declare(exg_name, exchange_type='direct') 2195 self.addCleanup(connection.channel().exchange_delete, exg_name) 2196 2197 # Declare a new queue 2198 ch.queue_declare(q_name, auto_delete=True) 2199 self.addCleanup(lambda: self._connect().channel().queue_delete(q_name)) 2200 2201 # Bind the queue to the exchange using routing key 2202 ch.queue_bind(q_name, exchange=exg_name, routing_key=routing_key) 2203 2204 # Deposit a message in the queue with mandatory=True 2205 msg1_headers = dict( 2206 test_name='TestBasicPublishWithoutPubacks') 2207 msg1_properties = pika.spec.BasicProperties(headers=msg1_headers) 2208 ch.basic_publish(exg_name, routing_key=routing_key, 2209 body='via-basic_publish_mandatory=True', 2210 properties=msg1_properties, 2211 mandatory=True) 2212 2213 # Deposit a message in the queue with mandatory=False 2214 ch.basic_publish(exg_name, routing_key=routing_key, 2215 body='via-basic_publish_mandatory=False', 2216 mandatory=False) 2217 2218 # Wait for the messages to arrive in queue 2219 self._assert_exact_message_count_with_retries(channel=ch, 2220 queue=q_name, 2221 expected_count=2) 2222 2223 # Create a consumer. Not passing a 'callback' to test client-generated 2224 # consumer tags 2225 rx_messages = [] 2226 consumer_tag = ch.basic_consume( 2227 q_name, 2228 lambda *args: rx_messages.append(args), 2229 auto_ack=False, 2230 exclusive=False, 2231 arguments=None) 2232 2233 # Wait for first message to arrive 2234 while not rx_messages: 2235 connection.process_data_events(time_limit=None) 2236 2237 self.assertGreaterEqual(len(rx_messages), 1) 2238 2239 # Check the first message 2240 msg = rx_messages[0] 2241 self.assertIsInstance(msg, tuple) 2242 rx_ch, rx_method, rx_properties, rx_body = msg 2243 self.assertIs(rx_ch, ch) 2244 self.assertIsInstance(rx_method, pika.spec.Basic.Deliver) 2245 self.assertEqual(rx_method.consumer_tag, consumer_tag) 2246 self.assertEqual(rx_method.delivery_tag, 1) 2247 self.assertFalse(rx_method.redelivered) 2248 self.assertEqual(rx_method.exchange, exg_name) 2249 self.assertEqual(rx_method.routing_key, routing_key) 2250 2251 self.assertIsInstance(rx_properties, pika.BasicProperties) 2252 self.assertEqual(rx_properties.headers, msg1_headers) 2253 self.assertEqual(rx_body, as_bytes('via-basic_publish_mandatory=True')) 2254 2255 # There shouldn't be any more events now 2256 self.assertFalse(ch._pending_events) 2257 2258 # Ack the message so that the next one can arrive (we configured QoS 2259 # with prefetch_count=1) 2260 ch.basic_ack(delivery_tag=rx_method.delivery_tag, multiple=False) 2261 2262 # Get the second message 2263 while len(rx_messages) < 2: 2264 connection.process_data_events(time_limit=None) 2265 2266 self.assertEqual(len(rx_messages), 2) 2267 2268 msg = rx_messages[1] 2269 self.assertIsInstance(msg, tuple) 2270 rx_ch, rx_method, rx_properties, rx_body = msg 2271 self.assertIs(rx_ch, ch) 2272 self.assertIsInstance(rx_method, pika.spec.Basic.Deliver) 2273 self.assertEqual(rx_method.consumer_tag, consumer_tag) 2274 self.assertEqual(rx_method.delivery_tag, 2) 2275 self.assertFalse(rx_method.redelivered) 2276 self.assertEqual(rx_method.exchange, exg_name) 2277 self.assertEqual(rx_method.routing_key, routing_key) 2278 2279 self.assertIsInstance(rx_properties, pika.BasicProperties) 2280 self.assertEqual(rx_body, as_bytes('via-basic_publish_mandatory=False')) 2281 2282 # There shouldn't be any more events now 2283 self.assertFalse(ch._pending_events) 2284 2285 ch.basic_ack(delivery_tag=rx_method.delivery_tag, multiple=False) 2286 2287 # Verify that the queue is now empty 2288 self._assert_exact_message_count_with_retries(channel=ch, 2289 queue=q_name, 2290 expected_count=0) 2291 2292 # Attempt to consume again with a short timeout 2293 connection.process_data_events(time_limit=0.005) 2294 self.assertEqual(len(rx_messages), 2) 2295 2296 2297class TestPublishFromBasicConsumeCallback(BlockingTestCaseBase): 2298 2299 def test(self): 2300 """BlockingChannel.basic_publish from basic_consume callback 2301 """ 2302 connection = self._connect() 2303 2304 ch = connection.channel() 2305 2306 src_q_name = ( 2307 'TestPublishFromBasicConsumeCallback_src_q' + uuid.uuid1().hex) 2308 dest_q_name = ( 2309 'TestPublishFromBasicConsumeCallback_dest_q' + uuid.uuid1().hex) 2310 2311 # Place channel in publisher-acknowledgments mode so that publishing 2312 # with mandatory=True will be synchronous 2313 ch.confirm_delivery() 2314 2315 # Declare source and destination queues 2316 ch.queue_declare(src_q_name, auto_delete=True) 2317 self.addCleanup(lambda: self._connect().channel().queue_delete(src_q_name)) 2318 ch.queue_declare(dest_q_name, auto_delete=True) 2319 self.addCleanup(lambda: self._connect().channel().queue_delete(dest_q_name)) 2320 2321 # Deposit a message in the source queue 2322 ch.basic_publish('', 2323 routing_key=src_q_name, 2324 body='via-publish', 2325 mandatory=True) 2326 2327 # Create a consumer 2328 def on_consume(channel, method, props, body): 2329 channel.basic_publish( 2330 '', routing_key=dest_q_name, body=body, 2331 properties=props, mandatory=True) 2332 channel.basic_ack(method.delivery_tag) 2333 2334 ch.basic_consume(src_q_name, 2335 on_consume, 2336 auto_ack=False, 2337 exclusive=False, 2338 arguments=None) 2339 2340 # Consume from destination queue 2341 for _, _, rx_body in ch.consume(dest_q_name, auto_ack=True): 2342 self.assertEqual(rx_body, as_bytes('via-publish')) 2343 break 2344 else: 2345 self.fail('failed to consume a messages from destination q') 2346 2347 2348class TestStopConsumingFromBasicConsumeCallback(BlockingTestCaseBase): 2349 2350 def test(self): 2351 """BlockingChannel.stop_consuming from basic_consume callback 2352 """ 2353 connection = self._connect() 2354 2355 ch = connection.channel() 2356 2357 q_name = ( 2358 'TestStopConsumingFromBasicConsumeCallback_q' + uuid.uuid1().hex) 2359 2360 # Place channel in publisher-acknowledgments mode so that publishing 2361 # with mandatory=True will be synchronous 2362 ch.confirm_delivery() 2363 2364 # Declare the queue 2365 ch.queue_declare(q_name, auto_delete=False) 2366 self.addCleanup(connection.channel().queue_delete, q_name) 2367 2368 # Deposit two messages in the queue 2369 ch.basic_publish('', 2370 routing_key=q_name, 2371 body='via-publish1', 2372 mandatory=True) 2373 2374 ch.basic_publish('', 2375 routing_key=q_name, 2376 body='via-publish2', 2377 mandatory=True) 2378 2379 # Create a consumer 2380 def on_consume(channel, method, props, body): # pylint: disable=W0613 2381 channel.stop_consuming() 2382 channel.basic_ack(method.delivery_tag) 2383 2384 ch.basic_consume(q_name, 2385 on_consume, 2386 auto_ack=False, 2387 exclusive=False, 2388 arguments=None) 2389 2390 ch.start_consuming() 2391 2392 ch.close() 2393 2394 ch = connection.channel() 2395 2396 # Verify that only the second message is present in the queue 2397 _, _, rx_body = ch.basic_get(q_name) 2398 self.assertEqual(rx_body, as_bytes('via-publish2')) 2399 2400 msg = ch.basic_get(q_name) 2401 self.assertTupleEqual(msg, (None, None, None)) 2402 2403 2404class TestCloseChannelFromBasicConsumeCallback(BlockingTestCaseBase): 2405 2406 def test(self): 2407 """BlockingChannel.close from basic_consume callback 2408 """ 2409 connection = self._connect() 2410 2411 ch = connection.channel() 2412 2413 q_name = ( 2414 'TestCloseChannelFromBasicConsumeCallback_q' + uuid.uuid1().hex) 2415 2416 # Place channel in publisher-acknowledgments mode so that publishing 2417 # with mandatory=True will be synchronous 2418 ch.confirm_delivery() 2419 2420 # Declare the queue 2421 ch.queue_declare(q_name, auto_delete=False) 2422 self.addCleanup(connection.channel().queue_delete, q_name) 2423 2424 # Deposit two messages in the queue 2425 ch.basic_publish('', 2426 routing_key=q_name, 2427 body='via-publish1', 2428 mandatory=True) 2429 2430 ch.basic_publish('', 2431 routing_key=q_name, 2432 body='via-publish2', 2433 mandatory=True) 2434 2435 # Create a consumer 2436 def on_consume(channel, method, props, body): # pylint: disable=W0613 2437 channel.close() 2438 2439 ch.basic_consume(q_name, 2440 on_consume, 2441 auto_ack=False, 2442 exclusive=False, 2443 arguments=None) 2444 2445 ch.start_consuming() 2446 2447 self.assertTrue(ch.is_closed) 2448 2449 2450 # Verify that both messages are present in the queue 2451 ch = connection.channel() 2452 _, _, rx_body = ch.basic_get(q_name) 2453 self.assertEqual(rx_body, as_bytes('via-publish1')) 2454 _, _, rx_body = ch.basic_get(q_name) 2455 self.assertEqual(rx_body, as_bytes('via-publish2')) 2456 2457 2458class TestCloseConnectionFromBasicConsumeCallback(BlockingTestCaseBase): 2459 2460 def test(self): 2461 """BlockingConnection.close from basic_consume callback 2462 """ 2463 connection = self._connect() 2464 2465 ch = connection.channel() 2466 2467 q_name = ( 2468 'TestCloseConnectionFromBasicConsumeCallback_q' + uuid.uuid1().hex) 2469 2470 # Place channel in publisher-acknowledgments mode so that publishing 2471 # with mandatory=True will be synchronous 2472 ch.confirm_delivery() 2473 2474 # Declare the queue 2475 ch.queue_declare(q_name, auto_delete=False) 2476 self.addCleanup(lambda: self._connect().channel().queue_delete(q_name)) 2477 2478 # Deposit two messages in the queue 2479 ch.basic_publish('', 2480 routing_key=q_name, 2481 body='via-publish1', 2482 mandatory=True) 2483 2484 ch.basic_publish('', 2485 routing_key=q_name, 2486 body='via-publish2', 2487 mandatory=True) 2488 2489 # Create a consumer 2490 def on_consume(channel, method, props, body): # pylint: disable=W0613 2491 connection.close() 2492 2493 ch.basic_consume(q_name, 2494 on_consume, 2495 auto_ack=False, 2496 exclusive=False, 2497 arguments=None) 2498 2499 ch.start_consuming() 2500 2501 self.assertTrue(ch.is_closed) 2502 self.assertTrue(connection.is_closed) 2503 2504 2505 # Verify that both messages are present in the queue 2506 ch = self._connect().channel() 2507 _, _, rx_body = ch.basic_get(q_name) 2508 self.assertEqual(rx_body, as_bytes('via-publish1')) 2509 _, _, rx_body = ch.basic_get(q_name) 2510 self.assertEqual(rx_body, as_bytes('via-publish2')) 2511 2512 2513class TestStartConsumingRaisesChannelClosedOnSameChannelFailure(BlockingTestCaseBase): 2514 2515 def test(self): 2516 """start_consuming() exits with ChannelClosed exception on same channel failure 2517 """ 2518 connection = self._connect() 2519 2520 # Fail test if exception leaks back ito I/O loop 2521 self._instrument_io_loop_exception_leak_detection(connection) 2522 2523 ch = connection.channel() 2524 2525 q_name = ( 2526 'TestStartConsumingPassesChannelClosedOnSameChannelFailure_q' + 2527 uuid.uuid1().hex) 2528 2529 # Declare the queue 2530 ch.queue_declare(q_name, auto_delete=False) 2531 self.addCleanup(lambda: self._connect().channel().queue_delete(q_name)) 2532 2533 ch.basic_consume(q_name, 2534 lambda *args, **kwargs: None, 2535 auto_ack=False, 2536 exclusive=False, 2537 arguments=None) 2538 2539 # Schedule a callback that will cause a channel error on the consumer's 2540 # channel by publishing to an unknown exchange. This will cause the 2541 # broker to close our channel. 2542 connection.add_callback_threadsafe( 2543 lambda: ch.basic_publish( 2544 exchange=q_name, 2545 routing_key='123', 2546 body=b'Nope this is wrong')) 2547 2548 with self.assertRaises(pika.exceptions.ChannelClosedByBroker): 2549 ch.start_consuming() 2550 2551 2552class TestStartConsumingReturnsAfterCancelFromBroker(BlockingTestCaseBase): 2553 2554 def test(self): 2555 """start_consuming() returns after Cancel from broker 2556 """ 2557 connection = self._connect() 2558 2559 ch = connection.channel() 2560 2561 q_name = ( 2562 'TestStartConsumingExitsOnCancelFromBroker_q' + uuid.uuid1().hex) 2563 2564 # Declare the queue 2565 ch.queue_declare(q_name, auto_delete=False) 2566 self.addCleanup(lambda: self._connect().channel().queue_delete(q_name)) 2567 2568 consumer_tag = ch.basic_consume(q_name, 2569 lambda *args, **kwargs: None, 2570 auto_ack=False, 2571 exclusive=False, 2572 arguments=None) 2573 2574 # Schedule a callback that will run while start_consuming() is 2575 # executing and delete the queue. This will cause the broker to cancel 2576 # our consumer 2577 connection.add_callback_threadsafe( 2578 lambda: self._connect().channel().queue_delete(q_name)) 2579 2580 ch.start_consuming() 2581 2582 self.assertNotIn(consumer_tag, ch._consumer_infos) 2583 2584 2585class TestNonPubAckPublishAndConsumeHugeMessage(BlockingTestCaseBase): 2586 2587 def test(self): 2588 """BlockingChannel.publish/consume huge message""" 2589 connection = self._connect() 2590 2591 ch = connection.channel() 2592 2593 q_name = 'TestPublishAndConsumeHugeMessage_q' + uuid.uuid1().hex 2594 body = 'a' * 1000000 2595 2596 # Declare a new queue 2597 ch.queue_declare(q_name, auto_delete=False) 2598 self.addCleanup(lambda: self._connect().channel().queue_delete(q_name)) 2599 2600 # Publish a message to the queue by way of default exchange 2601 ch.basic_publish(exchange='', routing_key=q_name, body=body) 2602 LOGGER.info('Published message body size=%s', len(body)) 2603 2604 # Consume the message 2605 for rx_method, rx_props, rx_body in ch.consume(q_name, auto_ack=False, 2606 exclusive=False, 2607 arguments=None): 2608 self.assertIsInstance(rx_method, pika.spec.Basic.Deliver) 2609 self.assertEqual(rx_method.delivery_tag, 1) 2610 self.assertFalse(rx_method.redelivered) 2611 self.assertEqual(rx_method.exchange, '') 2612 self.assertEqual(rx_method.routing_key, q_name) 2613 2614 self.assertIsInstance(rx_props, pika.BasicProperties) 2615 self.assertEqual(rx_body, as_bytes(body)) 2616 2617 # Ack the message 2618 ch.basic_ack(delivery_tag=rx_method.delivery_tag, multiple=False) 2619 2620 break 2621 2622 # There shouldn't be any more events now 2623 self.assertFalse(ch._queue_consumer_generator.pending_events) 2624 2625 # Verify that the queue is now empty 2626 ch.close() 2627 ch = connection.channel() 2628 self._assert_exact_message_count_with_retries(channel=ch, 2629 queue=q_name, 2630 expected_count=0) 2631 2632 2633class TestNonPubAckPublishAndConsumeManyMessages(BlockingTestCaseBase): 2634 2635 def test(self): 2636 """BlockingChannel non-pub-ack publish/consume many messages""" 2637 connection = self._connect() 2638 2639 ch = connection.channel() 2640 2641 q_name = ('TestNonPubackPublishAndConsumeManyMessages_q' + 2642 uuid.uuid1().hex) 2643 body = 'b' * 1024 2644 2645 num_messages_to_publish = 500 2646 2647 # Declare a new queue 2648 ch.queue_declare(q_name, auto_delete=False) 2649 self.addCleanup(lambda: self._connect().channel().queue_delete(q_name)) 2650 2651 for _ in pika.compat.xrange(num_messages_to_publish): 2652 # Publish a message to the queue by way of default exchange 2653 ch.basic_publish(exchange='', routing_key=q_name, body=body) 2654 2655 # Consume the messages 2656 num_consumed = 0 2657 for rx_method, rx_props, rx_body in ch.consume(q_name, 2658 auto_ack=False, 2659 exclusive=False, 2660 arguments=None): 2661 num_consumed += 1 2662 self.assertIsInstance(rx_method, pika.spec.Basic.Deliver) 2663 self.assertEqual(rx_method.delivery_tag, num_consumed) 2664 self.assertFalse(rx_method.redelivered) 2665 self.assertEqual(rx_method.exchange, '') 2666 self.assertEqual(rx_method.routing_key, q_name) 2667 2668 self.assertIsInstance(rx_props, pika.BasicProperties) 2669 self.assertEqual(rx_body, as_bytes(body)) 2670 2671 # Ack the message 2672 ch.basic_ack(delivery_tag=rx_method.delivery_tag, multiple=False) 2673 2674 if num_consumed >= num_messages_to_publish: 2675 break 2676 2677 # There shouldn't be any more events now 2678 self.assertFalse(ch._queue_consumer_generator.pending_events) 2679 2680 ch.close() 2681 2682 self.assertIsNone(ch._queue_consumer_generator) 2683 2684 # Verify that the queue is now empty 2685 ch = connection.channel() 2686 self._assert_exact_message_count_with_retries(channel=ch, 2687 queue=q_name, 2688 expected_count=0) 2689 2690 2691class TestBasicCancelWithNonAckableConsumer(BlockingTestCaseBase): 2692 2693 def test(self): 2694 """BlockingChannel user cancels non-ackable consumer via basic_cancel""" 2695 connection = self._connect() 2696 2697 ch = connection.channel() 2698 2699 q_name = ( 2700 'TestBasicCancelWithNonAckableConsumer_q' + uuid.uuid1().hex) 2701 2702 body1 = 'a' * 1024 2703 body2 = 'b' * 2048 2704 2705 # Declare a new queue 2706 ch.queue_declare(q_name, auto_delete=False) 2707 self.addCleanup(lambda: self._connect().channel().queue_delete(q_name)) 2708 2709 # Publish two messages to the queue by way of default exchange 2710 ch.basic_publish(exchange='', routing_key=q_name, body=body1) 2711 ch.basic_publish(exchange='', routing_key=q_name, body=body2) 2712 2713 # Wait for queue to contain both messages 2714 self._assert_exact_message_count_with_retries(channel=ch, 2715 queue=q_name, 2716 expected_count=2) 2717 2718 # Create a consumer that uses automatic ack mode 2719 consumer_tag = ch.basic_consume(q_name, lambda *x: None, auto_ack=True, 2720 exclusive=False, arguments=None) 2721 2722 # Wait for all messages to be sent by broker to client 2723 self._assert_exact_message_count_with_retries(channel=ch, 2724 queue=q_name, 2725 expected_count=0) 2726 2727 # Cancel the consumer 2728 messages = ch.basic_cancel(consumer_tag) 2729 2730 # Both messages should have been on their way when we cancelled 2731 self.assertEqual(len(messages), 2) 2732 2733 _, _, rx_body1 = messages[0] 2734 self.assertEqual(rx_body1, as_bytes(body1)) 2735 2736 _, _, rx_body2 = messages[1] 2737 self.assertEqual(rx_body2, as_bytes(body2)) 2738 2739 ch.close() 2740 2741 ch = connection.channel() 2742 2743 # Verify that the queue is now empty 2744 frame = ch.queue_declare(q_name, passive=True) 2745 self.assertEqual(frame.method.message_count, 0) 2746 2747 2748class TestBasicCancelWithAckableConsumer(BlockingTestCaseBase): 2749 2750 def test(self): 2751 """BlockingChannel user cancels ackable consumer via basic_cancel""" 2752 connection = self._connect() 2753 2754 ch = connection.channel() 2755 2756 q_name = ( 2757 'TestBasicCancelWithAckableConsumer_q' + uuid.uuid1().hex) 2758 2759 body1 = 'a' * 1024 2760 body2 = 'b' * 2048 2761 2762 # Declare a new queue 2763 ch.queue_declare(q_name, auto_delete=False) 2764 self.addCleanup(lambda: self._connect().channel().queue_delete(q_name)) 2765 2766 # Publish two messages to the queue by way of default exchange 2767 ch.basic_publish(exchange='', routing_key=q_name, body=body1) 2768 ch.basic_publish(exchange='', routing_key=q_name, body=body2) 2769 2770 # Wait for queue to contain both messages 2771 self._assert_exact_message_count_with_retries(channel=ch, 2772 queue=q_name, 2773 expected_count=2) 2774 2775 # Create an ackable consumer 2776 consumer_tag = ch.basic_consume(q_name, lambda *x: None, auto_ack=False, 2777 exclusive=False, arguments=None) 2778 2779 # Wait for all messages to be sent by broker to client 2780 self._assert_exact_message_count_with_retries(channel=ch, 2781 queue=q_name, 2782 expected_count=0) 2783 2784 # Cancel the consumer 2785 messages = ch.basic_cancel(consumer_tag) 2786 2787 # Both messages should have been on their way when we cancelled 2788 self.assertEqual(len(messages), 0) 2789 2790 ch.close() 2791 2792 ch = connection.channel() 2793 2794 # Verify that canceling the ackable consumer restored both messages 2795 self._assert_exact_message_count_with_retries(channel=ch, 2796 queue=q_name, 2797 expected_count=2) 2798 2799 2800class TestUnackedMessageAutoRestoredToQueueOnChannelClose(BlockingTestCaseBase): 2801 2802 def test(self): 2803 """BlockingChannel unacked message restored to q on channel close """ 2804 connection = self._connect() 2805 2806 ch = connection.channel() 2807 2808 q_name = ('TestUnackedMessageAutoRestoredToQueueOnChannelClose_q' + 2809 uuid.uuid1().hex) 2810 2811 body1 = 'a' * 1024 2812 body2 = 'b' * 2048 2813 2814 # Declare a new queue 2815 ch.queue_declare(q_name, auto_delete=False) 2816 self.addCleanup(lambda: self._connect().channel().queue_delete(q_name)) 2817 2818 # Publish two messages to the queue by way of default exchange 2819 ch.basic_publish(exchange='', routing_key=q_name, body=body1) 2820 ch.basic_publish(exchange='', routing_key=q_name, body=body2) 2821 2822 # Consume the events, but don't ack 2823 rx_messages = [] 2824 ch.basic_consume(q_name, lambda *args: rx_messages.append(args), 2825 auto_ack=False, exclusive=False, arguments=None) 2826 while len(rx_messages) != 2: 2827 connection.process_data_events(time_limit=None) 2828 2829 self.assertEqual(rx_messages[0][1].delivery_tag, 1) 2830 self.assertEqual(rx_messages[1][1].delivery_tag, 2) 2831 2832 # Verify no more ready messages in queue 2833 frame = ch.queue_declare(q_name, passive=True) 2834 self.assertEqual(frame.method.message_count, 0) 2835 2836 # Closing channel should restore messages back to queue 2837 ch.close() 2838 2839 # Verify that there are two messages in q now 2840 ch = connection.channel() 2841 2842 self._assert_exact_message_count_with_retries(channel=ch, 2843 queue=q_name, 2844 expected_count=2) 2845 2846 2847class TestNoAckMessageNotRestoredToQueueOnChannelClose(BlockingTestCaseBase): 2848 2849 def test(self): 2850 """BlockingChannel unacked message restored to q on channel close """ 2851 connection = self._connect() 2852 2853 ch = connection.channel() 2854 2855 q_name = ('TestNoAckMessageNotRestoredToQueueOnChannelClose_q' + 2856 uuid.uuid1().hex) 2857 2858 body1 = 'a' * 1024 2859 body2 = 'b' * 2048 2860 2861 # Declare a new queue 2862 ch.queue_declare(q_name, auto_delete=False) 2863 self.addCleanup(lambda: self._connect().channel().queue_delete(q_name)) 2864 2865 # Publish two messages to the queue by way of default exchange 2866 ch.basic_publish(exchange='', routing_key=q_name, body=body1) 2867 ch.basic_publish(exchange='', routing_key=q_name, body=body2) 2868 2869 # Consume, but don't ack 2870 num_messages = 0 2871 for rx_method, _, _ in ch.consume(q_name, auto_ack=True, exclusive=False): 2872 num_messages += 1 2873 2874 self.assertEqual(rx_method.delivery_tag, num_messages) 2875 2876 if num_messages == 2: 2877 break 2878 else: 2879 self.fail('expected 2 messages, but consumed %i' % (num_messages,)) 2880 2881 # Verify no more ready messages in queue 2882 frame = ch.queue_declare(q_name, passive=True) 2883 self.assertEqual(frame.method.message_count, 0) 2884 2885 # Closing channel should not restore no-ack messages back to queue 2886 ch.close() 2887 2888 # Verify that there are no messages in q now 2889 ch = connection.channel() 2890 frame = ch.queue_declare(q_name, passive=True) 2891 self.assertEqual(frame.method.message_count, 0) 2892 2893 2894class TestConsumeGeneratorInactivityTimeout(BlockingTestCaseBase): 2895 2896 def test(self): 2897 """BlockingChannel consume returns 3-tuple of None values on inactivity timeout """ 2898 connection = self._connect() 2899 2900 ch = connection.channel() 2901 2902 q_name = ('TestConsumeGeneratorInactivityTimeout_q' + uuid.uuid1().hex) 2903 2904 # Declare a new queue 2905 ch.queue_declare(q_name, auto_delete=True) 2906 2907 # Expect to get only (None, None, None) upon inactivity timeout, since 2908 # there are no messages in queue 2909 for msg in ch.consume(q_name, inactivity_timeout=0.1): 2910 self.assertEqual(msg, (None, None, None)) 2911 break 2912 else: 2913 self.fail('expected (None, None, None), but iterator stopped') 2914 2915 2916class TestConsumeGeneratorInterruptedByCancelFromBroker(BlockingTestCaseBase): 2917 2918 def test(self): 2919 """BlockingChannel consume generator is interrupted broker's Cancel """ 2920 connection = self._connect() 2921 2922 self.assertTrue(connection.consumer_cancel_notify_supported) 2923 2924 ch = connection.channel() 2925 2926 q_name = ('TestConsumeGeneratorInterruptedByCancelFromBroker_q' + 2927 uuid.uuid1().hex) 2928 2929 # Declare a new queue 2930 ch.queue_declare(q_name, auto_delete=True) 2931 2932 queue_deleted = False 2933 for _ in ch.consume(q_name, auto_ack=False, inactivity_timeout=0.001): 2934 if not queue_deleted: 2935 # Delete the queue to force Basic.Cancel from the broker 2936 ch.queue_delete(q_name) 2937 queue_deleted = True 2938 2939 self.assertIsNone(ch._queue_consumer_generator) 2940 2941 2942class TestConsumeGeneratorCancelEncountersCancelFromBroker(BlockingTestCaseBase): 2943 2944 def test(self): 2945 """BlockingChannel consume generator cancel called when broker's Cancel is enqueued """ 2946 connection = self._connect() 2947 2948 self.assertTrue(connection.consumer_cancel_notify_supported) 2949 2950 ch = connection.channel() 2951 2952 q_name = ('TestConsumeGeneratorCancelEncountersCancelFromBroker_q' + 2953 uuid.uuid1().hex) 2954 2955 # Declare a new queue 2956 ch.queue_declare(q_name, auto_delete=True) 2957 2958 for _ in ch.consume(q_name, auto_ack=False, inactivity_timeout=0.001): 2959 # Delete the queue to force Basic.Cancel from the broker 2960 ch.queue_delete(q_name) 2961 2962 # Wait for server's Basic.Cancel 2963 while not ch._queue_consumer_generator.pending_events: 2964 connection.process_data_events() 2965 2966 # Confirm it's Basic.Cancel 2967 self.assertIsInstance(ch._queue_consumer_generator.pending_events[0], 2968 blocking_connection._ConsumerCancellationEvt) 2969 2970 # Now attempt to cancel the consumer generator 2971 ch.cancel() 2972 2973 self.assertIsNone(ch._queue_consumer_generator) 2974 2975 2976class TestConsumeGeneratorPassesChannelClosedOnSameChannelFailure(BlockingTestCaseBase): 2977 2978 def test(self): 2979 """consume() exits with ChannelClosed exception on same channel failure 2980 """ 2981 connection = self._connect() 2982 2983 # Fail test if exception leaks back ito I/O loop 2984 self._instrument_io_loop_exception_leak_detection(connection) 2985 2986 ch = connection.channel() 2987 2988 q_name = ( 2989 'TestConsumeGeneratorPassesChannelClosedOnSameChannelFailure_q' + 2990 uuid.uuid1().hex) 2991 2992 # Declare the queue 2993 ch.queue_declare(q_name, auto_delete=False) 2994 self.addCleanup(lambda: self._connect().channel().queue_delete(q_name)) 2995 2996 # Schedule a callback that will cause a channel error on the consumer's 2997 # channel by publishing to an unknown exchange. This will cause the 2998 # broker to close our channel. 2999 connection.add_callback_threadsafe( 3000 lambda: ch.basic_publish( 3001 exchange=q_name, 3002 routing_key='123', 3003 body=b'Nope this is wrong')) 3004 3005 with self.assertRaises(pika.exceptions.ChannelClosedByBroker): 3006 for _ in ch.consume(q_name): 3007 pass 3008 3009 3010class TestChannelFlow(BlockingTestCaseBase): 3011 3012 def test(self): 3013 """BlockingChannel Channel.Flow activate and deactivate """ 3014 connection = self._connect() 3015 3016 ch = connection.channel() 3017 3018 q_name = ('TestChannelFlow_q' + uuid.uuid1().hex) 3019 3020 # Declare a new queue 3021 ch.queue_declare(q_name, auto_delete=False) 3022 self.addCleanup(lambda: self._connect().channel().queue_delete(q_name)) 3023 3024 # Verify zero active consumers on the queue 3025 frame = ch.queue_declare(q_name, passive=True) 3026 self.assertEqual(frame.method.consumer_count, 0) 3027 3028 # Create consumer 3029 ch.basic_consume(q_name, lambda *args: None) 3030 3031 # Verify one active consumer on the queue now 3032 frame = ch.queue_declare(q_name, passive=True) 3033 self.assertEqual(frame.method.consumer_count, 1) 3034 3035 # Activate flow from default state (active by default) 3036 active = ch.flow(True) 3037 self.assertEqual(active, True) 3038 3039 # Verify still one active consumer on the queue now 3040 frame = ch.queue_declare(q_name, passive=True) 3041 self.assertEqual(frame.method.consumer_count, 1) 3042 3043 3044class TestChannelRaisesWrongStateWhenDeclaringQueueOnClosedChannel(BlockingTestCaseBase): 3045 def test(self): 3046 """BlockingConnection: Declaring queue on closed channel raises ChannelWrongStateError""" 3047 q_name = ( 3048 'TestChannelRaisesWrongStateWhenDeclaringQueueOnClosedChannel_q' + 3049 uuid.uuid1().hex) 3050 3051 channel = self._connect().channel() 3052 channel.close() 3053 with self.assertRaises(pika.exceptions.ChannelWrongStateError): 3054 channel.queue_declare(q_name) 3055 3056 3057class TestChannelRaisesWrongStateWhenClosingClosedChannel(BlockingTestCaseBase): 3058 def test(self): 3059 """BlockingConnection: Closing closed channel raises ChannelWrongStateError""" 3060 channel = self._connect().channel() 3061 channel.close() 3062 with self.assertRaises(pika.exceptions.ChannelWrongStateError): 3063 channel.close() 3064 3065 3066class TestChannelContextManagerClosesChannel(BlockingTestCaseBase): 3067 def test(self): 3068 """BlockingConnection: chanel context manager exit survives closed channel""" 3069 with self._connect().channel() as channel: 3070 self.assertTrue(channel.is_open) 3071 3072 self.assertTrue(channel.is_closed) 3073 3074 3075class TestChannelContextManagerExitSurvivesClosedChannel(BlockingTestCaseBase): 3076 def test(self): 3077 """BlockingConnection: chanel context manager exit survives closed channel""" 3078 with self._connect().channel() as channel: 3079 self.assertTrue(channel.is_open) 3080 channel.close() 3081 self.assertTrue(channel.is_closed) 3082 3083 self.assertTrue(channel.is_closed) 3084 3085 3086class TestChannelContextManagerDoesNotSuppressChannelClosedByBroker( 3087 BlockingTestCaseBase): 3088 def test(self): 3089 """BlockingConnection: chanel context manager doesn't suppress ChannelClosedByBroker exception""" 3090 3091 exg_name = ( 3092 "TestChannelContextManagerDoesNotSuppressChannelClosedByBroker" + 3093 uuid.uuid1().hex) 3094 3095 with self.assertRaises(pika.exceptions.ChannelClosedByBroker): 3096 with self._connect().channel() as channel: 3097 # Passively declaring non-existent exchange should force broker 3098 # to close channel 3099 channel.exchange_declare(exg_name, passive=True) 3100 3101 self.assertTrue(channel.is_closed) 3102 3103 3104if __name__ == '__main__': 3105 unittest.main() 3106