1""" 2Tests for pika.channel.Channel 3""" 4import collections 5import logging 6import sys 7import unittest 8import warnings 9 10try: 11 from unittest import mock # pylint: disable=C0412 12except ImportError: 13 import mock 14 15from pika import channel, connection, exceptions, frame, spec 16 17# Disable protected-access, missing-docstring, and invalid-name, 18# too-many-public-methods, too-many-lines 19# pylint: disable=W0212,C0111,C0103,R0904,C0302 20 21class ConnectionTemplate(connection.Connection): 22 """Template for using as mock spec_set for the pika Connection class. It 23 defines members accessed by the code under test that would be defined in 24 the base class's constructor. 25 """ 26 callbacks = None 27 28 # Suppress pylint warnings about specific abstract methods not being 29 # overridden 30 _adapter_connect_stream = connection.Connection._adapter_connect_stream 31 _adapter_disconnect_stream = connection.Connection._adapter_disconnect_stream 32 _adapter_emit_data = connection.Connection._adapter_emit_data 33 _adapter_call_later = connection.Connection._adapter_call_later 34 _adapter_remove_timeout = connection.Connection._adapter_remove_timeout 35 _adapter_add_callback_threadsafe = ( 36 connection.Connection._adapter_add_callback_threadsafe) 37 38 39class ChannelTests(unittest.TestCase): 40 @staticmethod 41 @mock.patch('pika.connection.Connection', autospec=ConnectionTemplate) 42 def _create_connection(connection_class_mock=None): 43 return connection_class_mock() 44 45 def setUp(self): 46 self.connection = self._create_connection() 47 self._on_openok_callback = mock.Mock() 48 self.obj = channel.Channel(self.connection, 1, 49 self._on_openok_callback) 50 warnings.resetwarnings() 51 52 def tearDown(self): 53 del self.connection 54 del self._on_openok_callback 55 del self.obj 56 warnings.resetwarnings() 57 58 def test_init_invalid_channel_number(self): 59 self.assertRaises(exceptions.InvalidChannelNumber, channel.Channel, 60 'Foo', self.connection, lambda *args: None) 61 62 def test_init_channel_number(self): 63 self.assertEqual(self.obj.channel_number, 1) 64 65 def test_init_callbacks(self): 66 self.assertEqual(self.obj.callbacks, self.connection.callbacks) 67 68 def test_init_connection(self): 69 self.assertEqual(self.obj.connection, self.connection) 70 71 def test_init_content_frame_assembler(self): 72 self.assertIsInstance(self.obj._content_assembler, 73 channel.ContentFrameAssembler) 74 75 def test_init_blocked(self): 76 self.assertIsInstance(self.obj._blocked, collections.deque) 77 78 def test_init_blocking(self): 79 self.assertEqual(self.obj._blocking, None) 80 81 def test_init_on_flowok_callback(self): 82 self.assertEqual(self.obj._on_flowok_callback, None) 83 84 def test_init_has_on_flow_callback(self): 85 self.assertEqual(self.obj._has_on_flow_callback, False) 86 87 def test_init_on_openok_callback(self): 88 self.assertEqual(self.obj._on_openok_callback, 89 self._on_openok_callback) 90 91 def test_init_state(self): 92 self.assertEqual(self.obj._state, channel.Channel.CLOSED) 93 94 def test_init_cancelled(self): 95 self.assertIsInstance(self.obj._cancelled, set) 96 97 def test_init_consumers(self): 98 self.assertEqual(self.obj._consumers, dict()) 99 100 def test_init_flow(self): 101 self.assertEqual(self.obj.flow_active, True) 102 103 def test_init_on_getok_callback(self): 104 self.assertEqual(self.obj._on_getok_callback, None) 105 106 def test_add_callback(self): 107 mock_callback = mock.Mock() 108 self.obj.add_callback(mock_callback, [spec.Basic.Qos]) 109 self.connection.callbacks.add.assert_called_once_with( 110 self.obj.channel_number, spec.Basic.Qos, mock_callback, True) 111 112 def test_add_callback_multiple_replies(self): 113 mock_callback = mock.Mock() 114 self.obj.add_callback(mock_callback, 115 [spec.Basic.Qos, spec.Basic.QosOk]) 116 calls = [ 117 mock.call(self.obj.channel_number, spec.Basic.Qos, mock_callback, 118 True), 119 mock.call(self.obj.channel_number, spec.Basic.QosOk, mock_callback, 120 True) 121 ] 122 self.connection.callbacks.add.assert_has_calls(calls) 123 124 def test_add_on_cancel_callback(self): 125 mock_callback = mock.Mock() 126 self.obj.add_on_cancel_callback(mock_callback) 127 self.connection.callbacks.add.assert_called_once_with( 128 self.obj.channel_number, spec.Basic.Cancel, mock_callback, False) 129 130 def test_add_on_close_callback(self): 131 mock_callback = mock.Mock() 132 self.obj.add_on_close_callback(mock_callback) 133 self.connection.callbacks.add.assert_called_once_with( 134 self.obj.channel_number, '_on_channel_close', mock_callback, False, 135 self.obj) 136 137 def test_add_on_flow_callback(self): 138 mock_callback = mock.Mock() 139 self.obj.add_on_flow_callback(mock_callback) 140 self.connection.callbacks.add.assert_called_once_with( 141 self.obj.channel_number, spec.Channel.Flow, mock_callback, False) 142 143 def test_add_on_return_callback(self): 144 mock_callback = mock.Mock() 145 self.obj.add_on_return_callback(mock_callback) 146 self.connection.callbacks.add.assert_called_once_with( 147 self.obj.channel_number, '_on_return', mock_callback, False) 148 149 def test_basic_ack_channel_closed(self): 150 self.assertRaises(exceptions.ChannelWrongStateError, self.obj.basic_ack) 151 152 @mock.patch('pika.spec.Basic.Ack') 153 @mock.patch('pika.channel.Channel._send_method') 154 def test_basic_ack_calls_send_method(self, send_method, _unused): 155 self.obj._set_state(self.obj.OPEN) 156 self.obj.basic_ack(1, False) 157 send_method.assert_called_once_with(spec.Basic.Ack(1, False)) 158 159 def test_basic_cancel_asynch(self): 160 self.obj._set_state(self.obj.OPEN) 161 self.obj._consumers['ctag0'] = logging.debug 162 self.obj._rpc = mock.Mock(wraps=self.obj._rpc) 163 self.obj.basic_cancel(consumer_tag='ctag0') 164 165 self.assertTrue(self.obj._rpc.called) 166 self.assertFalse(self.obj.callbacks.add.called) 167 168 def test_basic_cancel_asynch_with_user_callback_raises_value_error(self): 169 self.obj._set_state(self.obj.OPEN) 170 consumer_tag = 'ctag0' 171 callback_mock = mock.Mock() 172 self.obj._consumers[consumer_tag] = callback_mock 173 self.assertRaises( 174 TypeError, 175 self.obj.basic_cancel, 176 consumer_tag, 177 callback='bad-callback') 178 179 @mock.patch('pika.channel.Channel._raise_if_not_open') 180 def test_basic_cancel_calls_raise_if_not_open(self, raise_if_not_open): 181 self.obj._set_state(self.obj.OPEN) 182 consumer_tag = 'ctag0' 183 callback_mock = mock.Mock() 184 self.obj._consumers[consumer_tag] = mock.Mock() 185 186 self.obj.basic_cancel(consumer_tag, callback=callback_mock) 187 188 raise_if_not_open.assert_called_once_with() 189 190 def test_basic_cancel_synch(self): 191 self.obj._set_state(self.obj.OPEN) 192 consumer_tag = 'ctag0' 193 callback_mock = mock.Mock() 194 self.obj._consumers[consumer_tag] = mock.Mock() 195 196 self.obj.basic_cancel(consumer_tag, callback=callback_mock) 197 198 # Verify consumer tag added to the cancelled list 199 self.assertListEqual(list(self.obj._cancelled), [consumer_tag]) 200 # Verify user completion callback registered 201 self.obj.callbacks.add.assert_any_call( 202 self.obj.channel_number, spec.Basic.CancelOk, callback_mock) 203 # Verify Channel._on_cancelok callback registered 204 self.obj.callbacks.add.assert_any_call( 205 self.obj.channel_number, 206 spec.Basic.CancelOk, 207 self.obj._on_cancelok, 208 arguments={ 209 'consumer_tag': 'ctag0' 210 }) 211 # Verify Channel._on_synchronous_complete callback registered 212 self.obj.callbacks.add.assert_any_call( 213 self.obj.channel_number, 214 spec.Basic.CancelOk, 215 self.obj._on_synchronous_complete, 216 arguments={ 217 'consumer_tag': 'ctag0' 218 }) 219 220 def test_basic_cancel_synch_no_user_callback_raises_value_error(self): 221 self.obj._set_state(self.obj.OPEN) 222 self.obj._consumers['ctag0'] = mock.Mock() 223 224 with self.assertRaises(TypeError): 225 self.obj.basic_cancel(consumer_tag='ctag0', callback='bad-callback') 226 227 # Verify arg error detected raised without making state changes 228 self.assertIn('ctag0', self.obj._consumers) 229 self.assertEqual(len(self.obj._cancelled), 0) 230 231 def test_basic_cancel_then_close(self): 232 self.obj._set_state(self.obj.OPEN) 233 callback_mock = mock.Mock() 234 consumer_tag = 'ctag0' 235 self.obj._consumers[consumer_tag] = mock.Mock() 236 self.obj.basic_cancel(consumer_tag, callback=callback_mock) 237 try: 238 self.obj.close() 239 except exceptions.ChannelClosed: 240 self.fail('unable to cancel consumers as channel is closing') 241 self.assertTrue(self.obj.is_closing) 242 243 @mock.patch('pika.channel.Channel._rpc') 244 def test_basic_cancel_unknown_consumer_tag(self, rpc): 245 self.obj._set_state(self.obj.OPEN) 246 callback_mock = mock.Mock() 247 consumer_tag = 'ctag0' 248 self.obj.basic_cancel(consumer_tag, callback=callback_mock) 249 self.assertFalse(rpc.called) 250 251 def test_basic_consume_legacy_parameter_queue(self): 252 # This is for the unlikely scenario where only 253 # the first parameter is updated 254 self.obj._set_state(self.obj.OPEN) 255 with self.assertRaises(TypeError): 256 self.obj.basic_consume('queue', 257 'whoops this should be a callback') 258 259 def test_basic_consume_legacy_parameter_callback(self): 260 self.obj._set_state(self.obj.OPEN) 261 callback_mock = mock.Mock() 262 with self.assertRaises(TypeError): 263 self.obj.basic_consume(callback_mock, 'queue') 264 265 def test_queue_declare_legacy_parameter_callback(self): 266 self.obj._set_state(self.obj.OPEN) 267 callback_mock = mock.Mock() 268 with self.assertRaises(TypeError): 269 self.obj.queue_declare(callback_mock, 'queue') 270 271 def test_exchange_declare_legacy_parameter_callback(self): 272 self.obj._set_state(self.obj.OPEN) 273 callback_mock = mock.Mock() 274 with self.assertRaises(TypeError): 275 self.obj.exchange_declare(callback_mock, 'exchange') 276 277 def test_queue_bind_legacy_parameter_callback(self): 278 self.obj._set_state(self.obj.OPEN) 279 callback_mock = mock.Mock() 280 with self.assertRaises(TypeError): 281 self.obj.queue_bind(callback_mock, 282 'queue', 283 'exchange') 284 285 def test_basic_cancel_legacy_parameter(self): 286 self.obj._set_state(self.obj.OPEN) 287 callback_mock = mock.Mock() 288 with self.assertRaises(TypeError): 289 self.obj.basic_cancel(callback_mock, 'tag') 290 291 def test_basic_get_legacy_parameter(self): 292 self.obj._set_state(self.obj.OPEN) 293 callback_mock = mock.Mock() 294 with self.assertRaises(TypeError): 295 self.obj.basic_get(callback_mock) 296 297 def test_basic_qos_legacy_parameter(self): 298 self.obj._set_state(self.obj.OPEN) 299 callback_mock = mock.Mock() 300 with self.assertRaises(TypeError): 301 self.obj.basic_get(callback_mock, 0, 0, False) 302 303 def test_basic_recover_legacy_parameter(self): 304 self.obj._set_state(self.obj.OPEN) 305 callback_mock = mock.Mock() 306 with self.assertRaises(TypeError): 307 self.obj.basic_recover(callback_mock, True) 308 309 def test_confirm_delivery_legacy_no_parameters(self): 310 self.obj._set_state(self.obj.OPEN) 311 with self.assertRaises(TypeError): 312 self.obj.confirm_delivery() 313 314 def test_confirm_delivery_legacy_nowait_parameter(self): 315 self.obj._set_state(self.obj.OPEN) 316 callback_mock = mock.Mock() 317 with self.assertRaises(TypeError): 318 self.obj.confirm_delivery(callback_mock, True) 319 320 def test_exchange_bind_legacy_parameter(self): 321 self.obj._set_state(self.obj.OPEN) 322 callback_mock = mock.Mock() 323 with self.assertRaises(TypeError): 324 self.obj.exchange_bind(callback_mock, 325 'destination', 326 'source', 327 'routing_key', 328 True) 329 330 def test_exchange_delete_legacy_parameter(self): 331 self.obj._set_state(self.obj.OPEN) 332 callback_mock = mock.Mock() 333 with self.assertRaises(TypeError): 334 self.obj.exchange_delete(callback_mock, 335 'exchange', 336 True) 337 338 def test_exchange_unbind_legacy_parameter(self): 339 self.obj._set_state(self.obj.OPEN) 340 callback_mock = mock.Mock() 341 with self.assertRaises(TypeError): 342 self.obj.exchange_unbind(callback_mock, 343 'destination', 344 'source', 345 'routing_key', 346 True) 347 348 def test_flow_legacy_parameter(self): 349 self.obj._set_state(self.obj.OPEN) 350 callback_mock = mock.Mock() 351 with self.assertRaises(TypeError): 352 self.obj.flow(callback_mock, True) 353 354 def test_queue_delete_legacy_parameter(self): 355 self.obj._set_state(self.obj.OPEN) 356 callback_mock = mock.Mock() 357 with self.assertRaises(TypeError): 358 self.obj.queue_delete(callback_mock, 359 'queue', 360 True, 361 True) 362 363 def test_queue_unbind_legacy_parameter(self): 364 self.obj._set_state(self.obj.OPEN) 365 callback_mock = mock.Mock() 366 with self.assertRaises(TypeError): 367 self.obj.queue_unbind(callback_mock, 368 'queue', 369 'exchange', 370 'routing_key') 371 372 def test_queue_purge_legacy_parameter(self): 373 self.obj._set_state(self.obj.OPEN) 374 callback_mock = mock.Mock() 375 with self.assertRaises(TypeError): 376 self.obj.queue_purge(callback_mock, 377 'queue', 378 True) 379 380 def test_basic_consume_channel_closed(self): 381 mock_callback = mock.Mock() 382 mock_on_msg_callback = mock.Mock() 383 self.assertRaises(exceptions.ChannelWrongStateError, 384 self.obj.basic_consume, 385 'test-queue', mock_on_msg_callback, 386 callback=mock_callback) 387 388 @mock.patch('pika.channel.Channel._raise_if_not_open') 389 def test_basic_consume_calls_raise_if_not_open(self, raise_if_not_open): 390 self.obj._set_state(self.obj.OPEN) 391 mock_callback = mock.Mock() 392 mock_on_msg_callback = mock.Mock() 393 self.obj.basic_consume('test-queue', mock_on_msg_callback, callback=mock_callback) 394 raise_if_not_open.assert_called_once_with() 395 396 def test_basic_consume_consumer_tag_no_completion_callback(self): 397 self.obj._set_state(self.obj.OPEN) 398 expectation = 'ctag1.' 399 mock_on_msg_callback = mock.Mock() 400 consumer_tag = self.obj.basic_consume('test-queue', 401 mock_on_msg_callback)[:6] 402 self.assertEqual(consumer_tag, expectation) 403 404 def test_basic_consume_consumer_tag_with_completion_callback(self): 405 self.obj._set_state(self.obj.OPEN) 406 expectation = 'ctag1.' 407 mock_callback = mock.Mock() 408 mock_on_msg_callback = mock.Mock() 409 consumer_tag = self.obj.basic_consume('test-queue', 410 mock_on_msg_callback, 411 callback=mock_callback)[:6] 412 self.assertEqual(consumer_tag, expectation) 413 414 def test_basic_consume_consumer_tag_cancelled_full(self): 415 self.obj._set_state(self.obj.OPEN) 416 expectation = 'ctag1.' 417 mock_on_msg_callback = mock.Mock() 418 for ctag in ['ctag1.%i' % ii for ii in range(11)]: 419 self.obj._cancelled.add(ctag) 420 self.assertEqual( 421 self.obj.basic_consume('test-queue', mock_on_msg_callback)[:6], 422 expectation) 423 424 def test_basic_consume_consumer_tag_in_consumers(self): 425 self.obj._set_state(self.obj.OPEN) 426 consumer_tag = 'ctag1.0' 427 mock_on_msg_callback = mock.Mock() 428 mock_callback = mock.Mock() 429 self.obj.basic_consume( 430 'test-queue', mock_on_msg_callback, 431 consumer_tag=consumer_tag, callback=mock_callback) 432 self.assertIn(consumer_tag, self.obj._consumers) 433 434 def test_basic_consume_duplicate_consumer_tag_raises(self): 435 self.obj._set_state(self.obj.OPEN) 436 consumer_tag = 'ctag1.0' 437 mock_on_msg_callback = mock.Mock() 438 mock_callback = mock.Mock() 439 self.obj._consumers[consumer_tag] = logging.debug 440 self.assertRaises(exceptions.DuplicateConsumerTag, 441 self.obj.basic_consume, 'test-queue', 442 mock_on_msg_callback, False, False, 443 consumer_tag, None, mock_callback) 444 445 def test_basic_consume_consumers_callback_value(self): 446 self.obj._set_state(self.obj.OPEN) 447 consumer_tag = 'ctag1.0' 448 mock_on_msg_callback = mock.Mock() 449 self.obj.basic_consume( 450 'test-queue', mock_on_msg_callback, consumer_tag=consumer_tag) 451 self.assertEqual(self.obj._consumers[consumer_tag], mock_on_msg_callback) 452 453 @mock.patch('pika.spec.Basic.Consume') 454 @mock.patch('pika.channel.Channel._rpc') 455 def test_basic_consume_consumers_rpc_with_no_completion_callback(self, rpc, _unused): 456 self.obj._set_state(self.obj.OPEN) 457 consumer_tag = 'ctag1.0' 458 mock_on_msg_callback = mock.Mock() 459 self.obj.basic_consume( 460 'test-queue', mock_on_msg_callback, 461 consumer_tag=consumer_tag) 462 expectation = spec.Basic.Consume( 463 queue='test-queue', 464 consumer_tag=consumer_tag, 465 no_ack=False, 466 exclusive=False) 467 rpc.assert_called_once_with(expectation, self.obj._on_eventok, 468 [(spec.Basic.ConsumeOk, { 469 'consumer_tag': consumer_tag 470 })]) 471 472 @mock.patch('pika.spec.Basic.Consume') 473 @mock.patch('pika.channel.Channel._rpc') 474 def test_basic_consume_consumers_rpc_with_completion_callback(self, rpc, _unused): 475 self.obj._set_state(self.obj.OPEN) 476 consumer_tag = 'ctag1.0' 477 mock_on_msg_callback = mock.Mock() 478 mock_callback = mock.Mock() 479 self.obj.basic_consume( 480 'test-queue', mock_on_msg_callback, 481 consumer_tag=consumer_tag, callback=mock_callback) 482 expectation = spec.Basic.Consume( 483 queue='test-queue', 484 consumer_tag=consumer_tag, 485 no_ack=False, 486 exclusive=False) 487 rpc.assert_called_once_with(expectation, mock_callback, 488 [(spec.Basic.ConsumeOk, { 489 'consumer_tag': consumer_tag 490 })]) 491 492 def test_basic_get_requires_callback(self): 493 self.obj._set_state(self.obj.OPEN) 494 with self.assertRaises(TypeError): 495 self.obj.basic_get('test-queue', None) 496 497 @mock.patch('pika.channel.Channel._send_method') 498 def test_basic_get_callback(self, _unused): 499 self.obj._set_state(self.obj.OPEN) 500 mock_callback = mock.Mock() 501 self.obj.basic_get('test-queue', mock_callback) 502 self.assertEqual(self.obj._on_getok_callback, mock_callback) 503 504 @mock.patch('pika.spec.Basic.Get') 505 @mock.patch('pika.channel.Channel._send_method') 506 def test_basic_get_send_method_called(self, send_method, _unused): 507 self.obj._set_state(self.obj.OPEN) 508 mock_callback = mock.Mock() 509 self.obj.basic_get('test-queue', mock_callback) 510 send_method.assert_called_once_with( 511 spec.Basic.Get(queue='test-queue', no_ack=False)) 512 513 @mock.patch('pika.spec.Basic.Get') 514 @mock.patch('pika.channel.Channel._send_method') 515 def test_basic_get_send_method_called_auto_ack(self, send_method, _unused): 516 self.obj._set_state(self.obj.OPEN) 517 mock_callback = mock.Mock() 518 self.obj.basic_get('test-queue', mock_callback, auto_ack=True) 519 send_method.assert_called_once_with( 520 spec.Basic.Get(queue='test-queue', no_ack=True)) 521 522 def test_basic_nack_raises_channel_wrong_state(self): 523 self.assertRaises(exceptions.ChannelWrongStateError, 524 self.obj.basic_nack, 0, False, True) 525 526 @mock.patch('pika.spec.Basic.Nack') 527 @mock.patch('pika.channel.Channel._send_method') 528 def test_basic_nack_send_method_request(self, send_method, _unused): 529 self.obj._set_state(self.obj.OPEN) 530 self.obj.basic_nack(1, False, True) 531 send_method.assert_called_once_with(spec.Basic.Nack(1, False, True)) 532 533 def test_basic_publish_raises_channel_wrong_state(self): 534 self.assertRaises(exceptions.ChannelWrongStateError, 535 self.obj.basic_publish, 'foo', 'bar', 'baz') 536 537 @mock.patch('pika.spec.Basic.Publish') 538 @mock.patch('pika.channel.Channel._send_method') 539 def test_basic_publish_send_method_request(self, send_method, _unused): 540 self.obj._set_state(self.obj.OPEN) 541 exchange = 'basic_publish_test' 542 routing_key = 'routing-key-fun' 543 body = b'This is my body' 544 properties = spec.BasicProperties(content_type='text/plain') 545 mandatory = False 546 self.obj.basic_publish(exchange, routing_key, body, properties, 547 mandatory) 548 send_method.assert_called_once_with( 549 spec.Basic.Publish( 550 exchange=exchange, 551 routing_key=routing_key, 552 mandatory=mandatory), (properties, body)) 553 554 def test_basic_qos_raises_channel_wrong_state(self): 555 self.assertRaises(exceptions.ChannelWrongStateError, 556 self.obj.basic_qos, 0, False, True) 557 558 def test_basic_qos_invalid_prefetch_size_raises_error(self): 559 self.obj._set_state(self.obj.OPEN) 560 with self.assertRaises(ValueError) as ex: 561 self.obj.basic_qos('foo', 123) 562 self.assertEqual("invalid literal for int() with base 10: 'foo'", 563 ex.exception.args[0]) 564 with self.assertRaises(ValueError) as ex: 565 self.obj.basic_qos(-1, 123) 566 self.assertIn('prefetch_size', ex.exception.args[0]) 567 568 def test_basic_qos_invalid_prefetch_count_raises_error(self): 569 self.obj._set_state(self.obj.OPEN) 570 with self.assertRaises(ValueError) as ex: 571 self.obj.basic_qos(123, 'foo') 572 self.assertEqual("invalid literal for int() with base 10: 'foo'", 573 ex.exception.args[0]) 574 with self.assertRaises(ValueError) as ex: 575 self.obj.basic_qos(123, -1) 576 self.assertIn('prefetch_count', ex.exception.args[0]) 577 578 @mock.patch('pika.spec.Basic.Qos') 579 @mock.patch('pika.channel.Channel._rpc') 580 def test_basic_qos_rpc_request(self, rpc, _unused): 581 self.obj._set_state(self.obj.OPEN) 582 mock_callback = mock.Mock() 583 self.obj.basic_qos(10, 20, False, callback=mock_callback) 584 rpc.assert_called_once_with( 585 spec.Basic.Qos(10, 20, False), mock_callback, [spec.Basic.QosOk]) 586 587 def test_basic_reject_raises_channel_wrong_state(self): 588 self.assertRaises(exceptions.ChannelWrongStateError, 589 self.obj.basic_reject, 1, False) 590 591 @mock.patch('pika.spec.Basic.Reject') 592 @mock.patch('pika.channel.Channel._send_method') 593 def test_basic_reject_send_method_request_with_int_tag( 594 self, send_method, _unused): 595 self.obj._set_state(self.obj.OPEN) 596 self.obj.basic_reject(1, True) 597 send_method.assert_called_once_with(spec.Basic.Reject(1, True)) 598 599 def test_basic_reject_spec_with_int_tag(self): 600 decoded = spec.Basic.Reject() 601 decoded.decode(b''.join(spec.Basic.Reject(1, True).encode())) 602 603 self.assertEqual(decoded.delivery_tag, 1) 604 self.assertIs(decoded.requeue, True) 605 606 @mock.patch('pika.spec.Basic.Reject') 607 @mock.patch('pika.channel.Channel._send_method') 608 def test_basic_reject_send_method_request_with_long_tag( 609 self, send_method, _unused): 610 self.obj._set_state(self.obj.OPEN) 611 612 # NOTE: we use `sys.maxsize` for compatibility with python 3, which 613 # doesn't have `sys.maxint` 614 self.obj.basic_reject(sys.maxsize, True) 615 send_method.assert_called_once_with( 616 spec.Basic.Reject(sys.maxsize, True)) 617 618 def test_basic_reject_spec_with_long_tag(self): 619 # NOTE: we use `sys.maxsize` for compatibility with python 3, which 620 # doesn't have `sys.maxint` 621 decoded = spec.Basic.Reject() 622 decoded.decode(b''.join(spec.Basic.Reject(sys.maxsize, True).encode())) 623 624 self.assertEqual(decoded.delivery_tag, sys.maxsize) 625 self.assertIs(decoded.requeue, True) 626 627 def test_basic_recover_raises_channel_wrong_state(self): 628 self.assertRaises(exceptions.ChannelWrongStateError, 629 self.obj.basic_qos, 0, False, True) 630 631 @mock.patch('pika.spec.Basic.Recover') 632 @mock.patch('pika.channel.Channel._rpc') 633 def test_basic_recover_rpc_request(self, rpc, _unused): 634 self.obj._set_state(self.obj.OPEN) 635 mock_callback = mock.Mock() 636 self.obj.basic_recover(True, callback=mock_callback) 637 rpc.assert_called_once_with( 638 spec.Basic.Recover(True), mock_callback, [spec.Basic.RecoverOk]) 639 640 def test_close_in_closing_state_raises_channel_wrong_state(self): 641 self.obj._set_state(self.obj.CLOSING) 642 self.assertRaises(exceptions.ChannelWrongStateError, self.obj.close) 643 self.assertTrue(self.obj.is_closing) 644 645 def test_close_in_closed_state_raises_channel_wrong_state_and_stays_closed( 646 self): 647 self.assertTrue(self.obj.is_closed) 648 self.assertRaises(exceptions.ChannelWrongStateError, self.obj.close) 649 self.assertTrue(self.obj.is_closed) 650 651 @mock.patch('pika.spec.Channel.Close') 652 def test_close_in_opening_state(self, _unused): 653 self.obj._set_state(self.obj.OPENING) 654 self.obj._rpc = mock.Mock(wraps=self.obj._rpc) 655 self.obj._cleanup = mock.Mock(wraps=self.obj._cleanup) 656 657 # close() called by user 658 self.obj.close(200, 'Got to go') 659 660 self.obj._rpc.assert_called_once_with( 661 spec.Channel.Close(200, 'Got to go', 0, 0), self.obj._on_closeok, 662 [spec.Channel.CloseOk]) 663 664 self.assertEqual(self.obj._closing_reason.reply_code, 200) 665 self.assertEqual(self.obj._closing_reason.reply_text, 'Got to go') 666 self.assertEqual(self.obj._state, self.obj.CLOSING) 667 668 # OpenOk method from broker 669 self.obj._on_openok( 670 frame.Method(self.obj.channel_number, 671 spec.Channel.OpenOk(self.obj.channel_number))) 672 self.assertEqual(self.obj._state, self.obj.CLOSING) 673 self.assertEqual(self.obj.callbacks.process.call_count, 0) 674 675 # CloseOk method from broker 676 self.obj._on_closeok( 677 frame.Method(self.obj.channel_number, spec.Channel.CloseOk())) 678 self.assertEqual(self.obj._state, self.obj.CLOSED) 679 680 self.obj.callbacks.process.assert_any_call(self.obj.channel_number, 681 '_on_channel_close', 682 self.obj, self.obj, 683 mock.ANY) 684 685 self.assertEqual(self.obj._cleanup.call_count, 1) 686 687 def test_close_in_open_state_transitions_to_closing(self): 688 self.obj._set_state(self.obj.OPEN) 689 self.obj.close() 690 self.assertEqual(self.obj._state, channel.Channel.CLOSING) 691 692 def test_close_basic_cancel_called(self): 693 self.obj._set_state(self.obj.OPEN) 694 self.obj._consumers['abc'] = None 695 with mock.patch.object(self.obj, 'basic_cancel') as basic_cancel: 696 self.obj.close() 697 # this is actually not necessary but Pika currently cancels 698 # every consumer before closing the channel 699 basic_cancel.assert_called_once_with(consumer_tag='abc') 700 701 def test_confirm_delivery_with_bad_callback_raises_value_error(self): 702 self.assertRaises(ValueError, 703 self.obj.confirm_delivery, 704 'bad-callback') 705 706 def test_confirm_delivery_raises_channel_wrong_state(self): 707 cb = mock.Mock() 708 self.assertRaises(exceptions.ChannelWrongStateError, 709 self.obj.confirm_delivery, cb) 710 711 def test_confirm_delivery_raises_method_not_implemented_for_confirms(self): 712 self.obj._set_state(self.obj.OPEN) 713 # Since connection is a mock.Mock, overwrite the method def with False 714 self.obj.connection.publisher_confirms = False 715 self.assertRaises(exceptions.MethodNotImplemented, 716 self.obj.confirm_delivery, logging.debug) 717 718 def test_confirm_delivery_raises_method_not_implemented_for_nack(self): 719 self.obj._set_state(self.obj.OPEN) 720 # Since connection is a mock.Mock, overwrite the method def with False 721 self.obj.connection.basic_nack = False 722 self.assertRaises(exceptions.MethodNotImplemented, 723 self.obj.confirm_delivery, logging.debug) 724 725 def test_confirm_delivery_async(self): 726 self.obj._set_state(self.obj.OPEN) 727 user_ack_nack_callback = mock.Mock() 728 self.obj.confirm_delivery(ack_nack_callback=user_ack_nack_callback) 729 730 self.assertEqual(self.obj.callbacks.add.call_count, 2) 731 self.obj.callbacks.add.assert_any_call(self.obj.channel_number, 732 spec.Basic.Ack, 733 user_ack_nack_callback, False) 734 self.obj.callbacks.add.assert_any_call(self.obj.channel_number, 735 spec.Basic.Nack, 736 user_ack_nack_callback, False) 737 738 def test_confirm_delivery_callback_without_nowait_selectok(self): 739 self.obj._set_state(self.obj.OPEN) 740 expectation = [ 741 self.obj.channel_number, 742 spec.Confirm.SelectOk, 743 self.obj._on_selectok 744 ] 745 self.obj.confirm_delivery(ack_nack_callback=logging.debug, 746 callback=self.obj._on_selectok) 747 self.obj.callbacks.add.assert_called_with(*expectation, arguments=None) 748 749 def test_confirm_delivery_callback_basic_ack(self): 750 self.obj._set_state(self.obj.OPEN) 751 expectation = (self.obj.channel_number, spec.Basic.Ack, logging.debug, 752 False) 753 self.obj.confirm_delivery(ack_nack_callback=logging.debug) 754 self.obj.callbacks.add.assert_any_call(*expectation) 755 756 def test_confirm_delivery_callback_basic_nack(self): 757 self.obj._set_state(self.obj.OPEN) 758 expectation = (self.obj.channel_number, spec.Basic.Nack, logging.debug, 759 False) 760 self.obj.confirm_delivery(ack_nack_callback=logging.debug) 761 self.obj.callbacks.add.assert_any_call(*expectation) 762 763 def test_confirm_delivery_no_callback_callback_call_count(self): 764 self.obj._set_state(self.obj.OPEN) 765 user_ack_nack_callback = mock.Mock() 766 self.obj.confirm_delivery(ack_nack_callback=user_ack_nack_callback, 767 callback=self.obj._on_selectok) 768 expectation = [ 769 mock.call( 770 *[ 771 self.obj.channel_number, 772 spec.Basic.Ack, 773 user_ack_nack_callback, 774 False 775 ]), 776 mock.call( 777 *[ 778 self.obj.channel_number, 779 spec.Basic.Nack, 780 user_ack_nack_callback, 781 False 782 ]), 783 mock.call( 784 *[ 785 self.obj.channel_number, 786 spec.Confirm.SelectOk, 787 self.obj._on_synchronous_complete 788 ], 789 arguments=None), 790 mock.call( 791 *[ 792 self.obj.channel_number, 793 spec.Confirm.SelectOk, 794 self.obj._on_selectok, 795 ], 796 arguments=None) 797 ] 798 self.assertEqual(self.obj.callbacks.add.call_args_list, expectation) 799 800 def test_confirm_delivery_callback_yes_basic_ack_callback(self): 801 self.obj._set_state(self.obj.OPEN) 802 user_callback = mock.Mock() 803 expectation = [self.obj.channel_number, spec.Basic.Ack, user_callback, False] 804 expectation_item = mock.call(*expectation) 805 self.obj.confirm_delivery(ack_nack_callback=user_callback) 806 self.assertIn(expectation_item, self.obj.callbacks.add.call_args_list) 807 808 def test_confirm_delivery_callback_yes_basic_nack_callback(self): 809 self.obj._set_state(self.obj.OPEN) 810 user_callback = mock.Mock() 811 expectation = [self.obj.channel_number, spec.Basic.Nack, user_callback, False] 812 expectation_item = mock.call(*expectation) 813 self.obj.confirm_delivery(ack_nack_callback=user_callback) 814 self.assertIn(expectation_item, self.obj.callbacks.add.call_args_list) 815 816 def test_consumer_tags(self): 817 self.assertListEqual(self.obj.consumer_tags, 818 list(self.obj._consumers.keys())) 819 820 def test_exchange_bind_raises_channel_wrong_state(self): 821 self.assertRaises(exceptions.ChannelWrongStateError, 822 self.obj.exchange_bind, 823 'foo', 'bar', 'baz', None, None) 824 825 def test_exchange_bind_raises_value_error_on_invalid_callback(self): 826 self.obj._set_state(self.obj.OPEN) 827 self.assertRaises(TypeError, self.obj.exchange_bind, 828 'foo', 'bar', 'baz', None, 'callback') 829 830 @mock.patch('pika.spec.Exchange.Bind') 831 @mock.patch('pika.channel.Channel._rpc') 832 def test_exchange_bind_rpc_request(self, rpc, _unused): 833 self.obj._set_state(self.obj.OPEN) 834 mock_callback = mock.Mock() 835 self.obj.exchange_bind('foo', 'bar', 'baz', callback=mock_callback) 836 rpc.assert_called_once_with( 837 spec.Exchange.Bind(0, 'foo', 'bar', 'baz'), mock_callback, 838 [spec.Exchange.BindOk]) 839 840 @mock.patch('pika.spec.Exchange.Bind') 841 @mock.patch('pika.channel.Channel._rpc') 842 def test_exchange_bind_rpc_request_nowait(self, rpc, _unused): 843 self.obj._set_state(self.obj.OPEN) 844 self.obj.exchange_bind('foo', 'bar', 'baz', callback=None) 845 rpc.assert_called_once_with( 846 spec.Exchange.Bind(0, 'foo', 'bar', 'baz'), None, []) 847 848 def test_exchange_declare_raises_channel_wrong_state(self): 849 self.assertRaises( 850 exceptions.ChannelWrongStateError, 851 self.obj.exchange_declare, 852 exchange='foo') 853 854 def test_exchange_declare_raises_value_error_on_invalid_callback(self): 855 self.obj._set_state(self.obj.OPEN) 856 self.assertRaises(TypeError, self.obj.exchange_declare, 857 'foo', callback='callback') 858 859 @mock.patch('pika.spec.Exchange.Declare') 860 @mock.patch('pika.channel.Channel._rpc') 861 def test_exchange_declare_rpc_request(self, rpc, _unused): 862 self.obj._set_state(self.obj.OPEN) 863 mock_callback = mock.Mock() 864 self.obj.exchange_declare('foo', callback=mock_callback) 865 rpc.assert_called_once_with( 866 spec.Exchange.Declare(0, 'foo'), mock_callback, 867 [spec.Exchange.DeclareOk]) 868 869 @mock.patch('pika.spec.Exchange.Declare') 870 @mock.patch('pika.channel.Channel._rpc') 871 def test_exchange_declare_rpc_request_nowait(self, rpc, _unused): 872 self.obj._set_state(self.obj.OPEN) 873 self.obj.exchange_declare('foo', callback=None) 874 rpc.assert_called_once_with( 875 spec.Exchange.Declare(0, 'foo'), None, []) 876 877 def test_exchange_delete_raises_channel_wrong_state(self): 878 self.assertRaises( 879 exceptions.ChannelWrongStateError, 880 self.obj.exchange_delete, exchange='foo') 881 882 def test_exchange_delete_raises_value_error_on_invalid_callback(self): 883 self.obj._set_state(self.obj.OPEN) 884 self.assertRaises(TypeError, self.obj.exchange_delete, 885 'foo', callback='callback') 886 887 @mock.patch('pika.spec.Exchange.Delete') 888 @mock.patch('pika.channel.Channel._rpc') 889 def test_exchange_delete_rpc_request(self, rpc, _unused): 890 self.obj._set_state(self.obj.OPEN) 891 mock_callback = mock.Mock() 892 self.obj.exchange_delete('foo', callback=mock_callback) 893 rpc.assert_called_once_with( 894 spec.Exchange.Delete(0, 'foo'), mock_callback, 895 [spec.Exchange.DeleteOk]) 896 897 @mock.patch('pika.spec.Exchange.Delete') 898 @mock.patch('pika.channel.Channel._rpc') 899 def test_exchange_delete_rpc_request_nowait(self, rpc, _unused): 900 self.obj._set_state(self.obj.OPEN) 901 self.obj.exchange_delete('foo', callback=None) 902 rpc.assert_called_once_with( 903 spec.Exchange.Delete(0, 'foo'), None, []) 904 905 def test_exchange_unbind_raises_channel_wrong_state(self): 906 self.assertRaises(exceptions.ChannelWrongStateError, 907 self.obj.exchange_unbind, 908 None, 'foo', 'bar', 'baz') 909 910 def test_exchange_unbind_raises_value_error_on_invalid_callback(self): 911 self.obj._set_state(self.obj.OPEN) 912 self.assertRaises(TypeError, self.obj.exchange_unbind, 913 'foo', 'bar', 'baz', callback='callback') 914 915 @mock.patch('pika.spec.Exchange.Unbind') 916 @mock.patch('pika.channel.Channel._rpc') 917 def test_exchange_unbind_rpc_request(self, rpc, _unused): 918 self.obj._set_state(self.obj.OPEN) 919 mock_callback = mock.Mock() 920 self.obj.exchange_unbind('foo', 'bar', 'baz', callback=mock_callback) 921 rpc.assert_called_once_with( 922 spec.Exchange.Unbind(0, 'foo', 'bar', 'baz'), mock_callback, 923 [spec.Exchange.UnbindOk]) 924 925 @mock.patch('pika.spec.Exchange.Unbind') 926 @mock.patch('pika.channel.Channel._rpc') 927 def test_exchange_unbind_rpc_request_nowait(self, rpc, _unused): 928 self.obj._set_state(self.obj.OPEN) 929 mock_callback = mock.Mock() 930 self.obj.exchange_unbind( 931 mock_callback, 'foo', 'bar', 'baz', callback=None) 932 rpc.assert_called_once_with( 933 spec.Exchange.Unbind(0, 'foo', 'bar', 'baz'), None, []) 934 935 def test_flow_raises_channel_wrong_state(self): 936 self.assertRaises(exceptions.ChannelWrongStateError, 937 self.obj.flow, True, 'foo') 938 939 def test_flow_raises_invalid_callback(self): 940 self.obj._set_state(self.obj.OPEN) 941 self.assertRaises(TypeError, self.obj.flow, True, 'foo') 942 943 @mock.patch('pika.spec.Channel.Flow') 944 @mock.patch('pika.channel.Channel._rpc') 945 def test_flow_on_rpc_request(self, rpc, _unused): 946 self.obj._set_state(self.obj.OPEN) 947 mock_callback = mock.Mock() 948 self.obj.flow(True, callback=mock_callback) 949 rpc.assert_called_once_with( 950 spec.Channel.Flow(True), self.obj._on_flowok, 951 [spec.Channel.FlowOk]) 952 953 @mock.patch('pika.spec.Channel.Flow') 954 @mock.patch('pika.channel.Channel._rpc') 955 def test_flow_off_rpc_request(self, rpc, _unused): 956 self.obj._set_state(self.obj.OPEN) 957 mock_callback = mock.Mock() 958 self.obj.flow(False, callback=mock_callback) 959 rpc.assert_called_once_with( 960 spec.Channel.Flow(False), self.obj._on_flowok, 961 [spec.Channel.FlowOk]) 962 963 @mock.patch('pika.channel.Channel._rpc') 964 def test_flow_on_flowok_callback(self, _rpc): 965 self.obj._set_state(self.obj.OPEN) 966 mock_callback = mock.Mock() 967 self.obj.flow(True, callback=mock_callback) 968 self.assertEqual(self.obj._on_flowok_callback, mock_callback) 969 970 def test_is_closed_true(self): 971 self.obj._set_state(self.obj.CLOSED) 972 self.assertTrue(self.obj.is_closed) 973 974 def test_is_closed_false(self): 975 self.obj._set_state(self.obj.OPEN) 976 self.assertFalse(self.obj.is_closed) 977 978 def test_is_closing_true(self): 979 self.obj._set_state(self.obj.CLOSING) 980 self.assertTrue(self.obj.is_closing) 981 982 def test_is_closing_false(self): 983 self.obj._set_state(self.obj.OPEN) 984 self.assertFalse(self.obj.is_closing) 985 986 @mock.patch('pika.channel.Channel._rpc') 987 def test_channel_open_add_callbacks_called(self, _rpc): 988 with mock.patch.object(self.obj, '_add_callbacks') as _add_callbacks: 989 self.obj.open() 990 _add_callbacks.assert_called_once_with() 991 992 def test_queue_bind_raises_channel_wrong_state(self): 993 self.assertRaises(exceptions.ChannelWrongStateError, 994 self.obj.queue_bind, '', 995 'foo', 'bar', 'baz') 996 997 def test_queue_bind_raises_value_error_on_invalid_callback(self): 998 self.obj._set_state(self.obj.OPEN) 999 self.assertRaises(TypeError, self.obj.queue_bind, 1000 'foo', 'bar', 'baz', callback='callback') 1001 1002 @mock.patch('pika.spec.Queue.Bind') 1003 @mock.patch('pika.channel.Channel._rpc') 1004 def test_queue_bind_rpc_request(self, rpc, _unused): 1005 self.obj._set_state(self.obj.OPEN) 1006 mock_callback = mock.Mock() 1007 self.obj.queue_bind('foo', 'bar', 'baz', callback=mock_callback) 1008 rpc.assert_called_once_with( 1009 spec.Queue.Bind(0, 'foo', 'bar', 'baz'), mock_callback, 1010 [spec.Queue.BindOk]) 1011 1012 @mock.patch('pika.spec.Queue.Bind') 1013 @mock.patch('pika.channel.Channel._rpc') 1014 def test_queue_bind_rpc_request_nowait(self, rpc, _unused): 1015 self.obj._set_state(self.obj.OPEN) 1016 self.obj.queue_bind('foo', 'bar', 'baz', callback=None) 1017 rpc.assert_called_once_with( 1018 spec.Queue.Bind(0, 'foo', 'bar', 'baz'), None, []) 1019 1020 def test_queue_declare_raises_channel_wrong_state(self): 1021 self.assertRaises( 1022 exceptions.ChannelWrongStateError, 1023 self.obj.queue_declare, 1024 queue='foo', 1025 callback=None) 1026 1027 def test_queue_declare_raises_value_error_on_invalid_callback(self): 1028 self.obj._set_state(self.obj.OPEN) 1029 self.assertRaises(TypeError, self.obj.queue_declare, 1030 'foo', callback='callback') 1031 1032 @mock.patch('pika.spec.Queue.Declare') 1033 @mock.patch('pika.channel.Channel._rpc') 1034 def test_queue_declare_rpc_request(self, rpc, _unused): 1035 self.obj._set_state(self.obj.OPEN) 1036 mock_callback = mock.Mock() 1037 self.obj.queue_declare('foo', callback=mock_callback) 1038 rpc.assert_called_once_with( 1039 spec.Queue.Declare(0, 'foo'), mock_callback, 1040 [(spec.Queue.DeclareOk, { 1041 'queue': 'foo' 1042 })]) 1043 1044 @mock.patch('pika.spec.Queue.Declare') 1045 @mock.patch('pika.channel.Channel._rpc') 1046 def test_queue_declare_rpc_request_nowait(self, rpc, _unused): 1047 self.obj._set_state(self.obj.OPEN) 1048 self.obj.queue_declare('foo', callback=None) 1049 rpc.assert_called_once_with( 1050 spec.Queue.Declare(0, 'foo'), None, []) 1051 1052 def test_queue_delete_raises_channel_wrong_state(self): 1053 self.assertRaises( 1054 exceptions.ChannelWrongStateError, 1055 self.obj.queue_delete, queue='foo') 1056 1057 def test_queue_delete_raises_value_error_on_invalid_callback(self): 1058 self.obj._set_state(self.obj.OPEN) 1059 self.assertRaises(TypeError, self.obj.queue_delete, 1060 'foo', callback='callback') 1061 1062 @mock.patch('pika.spec.Queue.Delete') 1063 @mock.patch('pika.channel.Channel._rpc') 1064 def test_queue_delete_rpc_request(self, rpc, _unused): 1065 self.obj._set_state(self.obj.OPEN) 1066 mock_callback = mock.Mock() 1067 self.obj.queue_delete('foo', callback=mock_callback) 1068 rpc.assert_called_once_with( 1069 spec.Queue.Delete(0, 'foo'), mock_callback, [spec.Queue.DeleteOk]) 1070 1071 @mock.patch('pika.spec.Queue.Delete') 1072 @mock.patch('pika.channel.Channel._rpc') 1073 def test_queue_delete_rpc_request_nowait(self, rpc, _unused): 1074 self.obj._set_state(self.obj.OPEN) 1075 self.obj.queue_delete('foo', callback=None) 1076 rpc.assert_called_once_with( 1077 spec.Queue.Delete(0, 'foo'), None, []) 1078 1079 def test_queue_purge_raises_channel_wrong_state(self): 1080 self.assertRaises( 1081 exceptions.ChannelWrongStateError, 1082 self.obj.queue_purge, queue='foo') 1083 1084 def test_queue_purge_raises_value_error_on_invalid_callback(self): 1085 self.obj._set_state(self.obj.OPEN) 1086 self.assertRaises(TypeError, self.obj.queue_purge, 1087 'foo', callback='callback') 1088 1089 @mock.patch('pika.spec.Queue.Purge') 1090 @mock.patch('pika.channel.Channel._rpc') 1091 def test_queue_purge_rpc_request(self, rpc, _unused): 1092 self.obj._set_state(self.obj.OPEN) 1093 mock_callback = mock.Mock() 1094 self.obj.queue_purge('foo', callback=mock_callback) 1095 rpc.assert_called_once_with( 1096 spec.Queue.Purge(0, 'foo'), mock_callback, [spec.Queue.PurgeOk]) 1097 1098 @mock.patch('pika.spec.Queue.Purge') 1099 @mock.patch('pika.channel.Channel._rpc') 1100 def test_queue_purge_rpc_request_nowait(self, rpc, _unused): 1101 self.obj._set_state(self.obj.OPEN) 1102 self.obj.queue_purge('foo', callback=None) 1103 rpc.assert_called_once_with( 1104 spec.Queue.Purge(0, 'foo'), None, []) 1105 1106 def test_queue_unbind_raises_channel_wrong_state(self): 1107 self.assertRaises(exceptions.ChannelWrongStateError, 1108 self.obj.queue_unbind, 1109 'foo', 'bar', 'baz', callback=None) 1110 1111 def test_queue_unbind_raises_value_error_on_invalid_callback(self): 1112 self.obj._set_state(self.obj.OPEN) 1113 self.assertRaises(TypeError, self.obj.queue_unbind, 'foo', 1114 'bar', 'baz', callback='callback') 1115 1116 @mock.patch('pika.spec.Queue.Unbind') 1117 @mock.patch('pika.channel.Channel._rpc') 1118 def test_queue_unbind_rpc_request(self, rpc, _unused): 1119 self.obj._set_state(self.obj.OPEN) 1120 mock_callback = mock.Mock() 1121 self.obj.queue_unbind('foo', 'bar', 'baz', callback=mock_callback) 1122 rpc.assert_called_once_with( 1123 spec.Queue.Unbind(0, 'foo', 'bar', 'baz'), mock_callback, 1124 [spec.Queue.UnbindOk]) 1125 1126 def test_tx_commit_raises_channel_wrong_state(self): 1127 self.assertRaises(exceptions.ChannelWrongStateError, 1128 self.obj.tx_commit, None) 1129 1130 @mock.patch('pika.spec.Tx.Commit') 1131 @mock.patch('pika.channel.Channel._rpc') 1132 def test_tx_commit_rpc_request(self, rpc, _unused): 1133 self.obj._set_state(self.obj.OPEN) 1134 mock_callback = mock.Mock() 1135 self.obj.tx_commit(callback=mock_callback) 1136 rpc.assert_called_once_with(spec.Tx.Commit(), mock_callback, 1137 [spec.Tx.CommitOk]) 1138 1139 @mock.patch('pika.spec.Tx.Rollback') 1140 @mock.patch('pika.channel.Channel._rpc') 1141 def test_tx_rollback_rpc_request(self, rpc, _unused): 1142 self.obj._set_state(self.obj.OPEN) 1143 mock_callback = mock.Mock() 1144 self.obj.tx_rollback(callback=mock_callback) 1145 rpc.assert_called_once_with(spec.Tx.Rollback(), mock_callback, 1146 [spec.Tx.RollbackOk]) 1147 1148 @mock.patch('pika.spec.Tx.Select') 1149 @mock.patch('pika.channel.Channel._rpc') 1150 def test_tx_select_rpc_request(self, rpc, _unused): 1151 self.obj._set_state(self.obj.OPEN) 1152 mock_callback = mock.Mock() 1153 self.obj.tx_select(callback=mock_callback) 1154 rpc.assert_called_once_with(spec.Tx.Select(), mock_callback, 1155 [spec.Tx.SelectOk]) 1156 1157 # Test internal methods 1158 1159 def test_add_callbacks_basic_cancel_empty_added(self): 1160 self.obj._add_callbacks() 1161 self.obj.callbacks.add.assert_any_call(self.obj.channel_number, 1162 spec.Basic.Cancel, 1163 self.obj._on_cancel, False) 1164 1165 def test_add_callbacks_basic_get_empty_added(self): 1166 self.obj._add_callbacks() 1167 self.obj.callbacks.add.assert_any_call(self.obj.channel_number, 1168 spec.Basic.GetEmpty, 1169 self.obj._on_getempty, False) 1170 1171 def test_add_callbacks_channel_close_added(self): 1172 self.obj._add_callbacks() 1173 self.obj.callbacks.add.assert_any_call(self.obj.channel_number, 1174 spec.Channel.Close, 1175 self.obj._on_close_from_broker, 1176 True) 1177 1178 def test_add_callbacks_channel_flow_added(self): 1179 self.obj._add_callbacks() 1180 self.obj.callbacks.add.assert_any_call(self.obj.channel_number, 1181 spec.Channel.Flow, 1182 self.obj._on_flow, False) 1183 1184 def test_cleanup(self): 1185 self.obj._cleanup() 1186 self.obj.callbacks.cleanup.assert_called_once_with( 1187 str(self.obj.channel_number)) 1188 1189 def test_handle_content_frame_method_returns_none(self): 1190 frame_value = frame.Method(1, spec.Basic.Deliver('ctag0', 1)) 1191 self.assertEqual(self.obj._handle_content_frame(frame_value), None) 1192 1193 def test_handle_content_frame_sets_method_frame(self): 1194 frame_value = frame.Method(1, spec.Basic.Deliver('ctag0', 1)) 1195 self.obj._handle_content_frame(frame_value) 1196 self.assertEqual(self.obj._content_assembler._method_frame, 1197 frame_value) 1198 1199 def test_handle_content_frame_sets_header_frame(self): 1200 frame_value = frame.Header(1, 10, spec.BasicProperties()) 1201 self.obj._handle_content_frame(frame_value) 1202 self.assertEqual(self.obj._content_assembler._header_frame, 1203 frame_value) 1204 1205 def test_handle_content_frame_basic_deliver_called(self): 1206 method_value = frame.Method(1, spec.Basic.Deliver('ctag0', 1)) 1207 self.obj._handle_content_frame(method_value) 1208 header_value = frame.Header(1, 10, spec.BasicProperties()) 1209 self.obj._handle_content_frame(header_value) 1210 body_value = frame.Body(1, b'0123456789') 1211 with mock.patch.object(self.obj, '_on_deliver') as deliver: 1212 self.obj._handle_content_frame(body_value) 1213 deliver.assert_called_once_with(method_value, header_value, 1214 b'0123456789') 1215 1216 def test_handle_content_frame_basic_get_called(self): 1217 method_value = frame.Method(1, spec.Basic.GetOk('ctag0', 1)) 1218 self.obj._handle_content_frame(method_value) 1219 header_value = frame.Header(1, 10, spec.BasicProperties()) 1220 self.obj._handle_content_frame(header_value) 1221 body_value = frame.Body(1, b'0123456789') 1222 with mock.patch.object(self.obj, '_on_getok') as getok: 1223 self.obj._handle_content_frame(body_value) 1224 getok.assert_called_once_with(method_value, header_value, 1225 b'0123456789') 1226 1227 def test_handle_content_frame_basic_return_called(self): 1228 method_value = frame.Method(1, 1229 spec.Basic.Return(999, 'Reply Text', 1230 'exchange_value', 1231 'routing.key')) 1232 self.obj._handle_content_frame(method_value) 1233 header_value = frame.Header(1, 10, spec.BasicProperties()) 1234 self.obj._handle_content_frame(header_value) 1235 body_value = frame.Body(1, b'0123456789') 1236 with mock.patch.object(self.obj, '_on_return') as basic_return: 1237 self.obj._handle_content_frame(body_value) 1238 basic_return.assert_called_once_with(method_value, header_value, 1239 b'0123456789') 1240 1241 def test_has_content_true(self): 1242 self.assertTrue(spec.has_content(spec.Basic.GetOk.INDEX)) 1243 1244 def test_has_content_false(self): 1245 self.assertFalse(spec.has_content(spec.Basic.Ack.INDEX)) 1246 1247 def test_on_cancel_not_appended_cancelled(self): 1248 consumer_tag = 'ctag0' 1249 frame_value = frame.Method(1, spec.Basic.Cancel(consumer_tag)) 1250 self.obj._on_cancel(frame_value) 1251 self.assertNotIn(consumer_tag, self.obj._cancelled) 1252 1253 def test_on_cancel_removed_consumer(self): 1254 consumer_tag = 'ctag0' 1255 self.obj._consumers[consumer_tag] = logging.debug 1256 frame_value = frame.Method(1, spec.Basic.Cancel(consumer_tag)) 1257 self.obj._on_cancel(frame_value) 1258 self.assertNotIn(consumer_tag, self.obj._consumers) 1259 1260 def test_on_cancelok_removed_consumer(self): 1261 consumer_tag = 'ctag0' 1262 self.obj._consumers[consumer_tag] = logging.debug 1263 frame_value = frame.Method(1, spec.Basic.CancelOk(consumer_tag)) 1264 self.obj._on_cancelok(frame_value) 1265 self.assertNotIn(consumer_tag, self.obj._consumers) 1266 1267 @mock.patch('pika.spec.Channel.CloseOk') 1268 def test_on_close_from_broker_in_open_state(self, _unused): 1269 self.obj._set_state(self.obj.OPEN) 1270 self.obj._send_method = mock.Mock(wraps=self.obj._send_method) 1271 self.obj._cleanup = mock.Mock(wraps=self.obj._cleanup) 1272 1273 method_frame = frame.Method(self.obj.channel_number, 1274 spec.Channel.Close(400, 'error')) 1275 self.obj._on_close_from_broker(method_frame) 1276 1277 self.assertTrue(self.obj.is_closed, 1278 'Channel was not closed; state=%s' % 1279 (self.obj._state, )) 1280 1281 self.obj._send_method.assert_called_once_with(spec.Channel.CloseOk()) 1282 1283 self.obj.callbacks.process.assert_any_call( 1284 self.obj.channel_number, '_on_channel_close', self.obj, self.obj, 1285 mock.ANY) 1286 1287 reason = self.obj.callbacks.process.call_args_list[0][0][4] 1288 self.assertIsInstance(reason, exceptions.ChannelClosedByBroker) 1289 self.assertEqual((reason.reply_code, reason.reply_text), 1290 (400, 'error')) 1291 1292 self.assertEqual(self.obj._cleanup.call_count, 1) 1293 1294 def test_on_close_from_broker_in_closing_state(self): 1295 self.obj._set_state(self.obj.CLOSING) 1296 self.obj._cleanup = mock.Mock(wraps=self.obj._cleanup) 1297 1298 method_frame = frame.Method(self.obj.channel_number, 1299 spec.Channel.Close(400, 'error')) 1300 self.obj._on_close_from_broker(method_frame) 1301 1302 # Verify didn't alter state (will wait for CloseOk) 1303 self.assertTrue(self.obj.is_closing, 1304 'Channel was not closed; state=%s' % 1305 (self.obj._state, )) 1306 1307 self.assertIsInstance(self.obj._closing_reason, 1308 exceptions.ChannelClosedByBroker) 1309 self.assertEqual(self.obj._closing_reason.reply_code, 400) 1310 self.assertEqual(self.obj._closing_reason.reply_text, 'error') 1311 1312 self.assertFalse(self.obj.callbacks.process.called, 1313 self.obj.callbacks.process.call_args_list) 1314 1315 self.assertFalse(self.obj._cleanup.called) 1316 1317 @mock.patch('logging.Logger.warning') 1318 def test_on_close_from_broker_warning(self, warning): 1319 self.obj._state = channel.Channel.OPEN 1320 method_frame = frame.Method(self.obj.channel_number, 1321 spec.Channel.Close(999, 'Test_Value')) 1322 self.obj._on_close_from_broker(method_frame) 1323 warning.assert_called_once_with( 1324 'Received remote Channel.Close (%s): %r on %s', 1325 method_frame.method.reply_code, method_frame.method.reply_text, 1326 self.obj) 1327 1328 self.assertIsInstance(self.obj._closing_reason, 1329 exceptions.ChannelClosedByBroker) 1330 1331 def _verify_on_close_meta_transitions_to_closed(self, initial_state): 1332 self.obj._set_state(initial_state) 1333 self.obj._cleanup = mock.Mock(wraps=self.obj._cleanup) 1334 1335 reason = Exception('Oops') 1336 self.obj._on_close_meta(reason) 1337 1338 self.assertTrue(self.obj.is_closed) 1339 1340 self.assertEqual(self.obj._cleanup.call_count, 1) 1341 1342 self.assertEqual(self.obj.callbacks.process.call_count, 2) 1343 1344 self.obj.callbacks.process.assert_any_call( 1345 self.obj.channel_number, '_on_channel_close', self.obj, self.obj, 1346 reason) 1347 1348 self.obj.callbacks.process.assert_any_call( 1349 self.obj.channel_number, self.obj._ON_CHANNEL_CLEANUP_CB_KEY, 1350 self.obj, self.obj) 1351 1352 def test_on_close_meta_in_opening_state_transitions_to_closed(self): 1353 self._verify_on_close_meta_transitions_to_closed(self.obj.OPENING) 1354 1355 def test_on_close_meta_in_open_state_transitions_to_closed(self): 1356 self._verify_on_close_meta_transitions_to_closed(self.obj.OPEN) 1357 1358 def test_on_close_meta_in_closing_state_transitions_to_closed(self): 1359 self._verify_on_close_meta_transitions_to_closed(self.obj.CLOSING) 1360 1361 def test_on_close_meta_in_closed_state_is_suppressed(self): 1362 self.obj._cleanup = mock.Mock(wraps=self.obj._cleanup) 1363 self.obj._set_state(self.obj.CLOSED) 1364 1365 self.obj._on_close_meta(Exception('Internal error')) 1366 1367 self.assertTrue(self.obj.is_closed) 1368 self.assertEqual(self.obj.callbacks.process.call_count, 0) 1369 self.assertEqual(self.obj._cleanup.call_count, 0) 1370 1371 def test_on_deliver_callback_called(self): 1372 self.obj._set_state(self.obj.OPEN) 1373 consumer_tag = 'ctag0' 1374 mock_callback = mock.Mock() 1375 self.obj._consumers[consumer_tag] = mock_callback 1376 method_value = frame.Method(1, spec.Basic.Deliver(consumer_tag, 1)) 1377 header_value = frame.Header(1, 10, spec.BasicProperties()) 1378 body_value = b'0123456789' 1379 self.obj._on_deliver(method_value, header_value, body_value) 1380 mock_callback.assert_called_with(self.obj, method_value.method, 1381 header_value.properties, body_value) 1382 1383 def test_on_closeok(self): 1384 self.obj._set_state(self.obj.OPEN) 1385 self.obj._cleanup = mock.Mock(wraps=self.obj._cleanup) 1386 1387 # Close from user 1388 self.obj.close(200, 'All is well') 1389 self.assertEqual(self.obj._closing_reason.reply_code, 200) 1390 self.assertEqual(self.obj._closing_reason.reply_text, 'All is well') 1391 self.assertEqual(self.obj._state, self.obj.CLOSING) 1392 1393 self.obj._on_closeok( 1394 frame.Method(self.obj.channel_number, spec.Channel.CloseOk())) 1395 1396 self.assertTrue(self.obj.is_closed, 1397 'Channel was not closed; state=%s' % 1398 (self.obj._state, )) 1399 1400 self.obj.callbacks.process.assert_any_call(self.obj.channel_number, 1401 '_on_channel_close', 1402 self.obj, self.obj, 1403 mock.ANY) 1404 reason = self.obj.callbacks.process.call_args_list[0][0][4] 1405 self.assertIsInstance(reason, exceptions.ChannelClosedByClient) 1406 self.assertEqual((reason.reply_code, reason.reply_text), 1407 (200, 'All is well')) 1408 1409 self.assertEqual(self.obj._cleanup.call_count, 1) 1410 1411 def test_on_closeok_following_close_from_broker(self): 1412 self.obj._set_state(self.obj.OPEN) 1413 self.obj._cleanup = mock.Mock(wraps=self.obj._cleanup) 1414 1415 # Close from user 1416 self.obj.close(0, 'All is well') 1417 self.assertEqual(self.obj._closing_reason.reply_code, 0) 1418 self.assertEqual(self.obj._closing_reason.reply_text, 'All is well') 1419 self.assertEqual(self.obj._state, self.obj.CLOSING) 1420 1421 # Close from broker before Channel.CloseOk 1422 self.obj._on_close_from_broker( 1423 frame.Method(self.obj.channel_number, 1424 spec.Channel.Close(400, 1425 'broker is having a bad day'))) 1426 1427 self.assertIsInstance(self.obj._closing_reason, 1428 exceptions.ChannelClosedByBroker) 1429 self.assertEqual( 1430 (self.obj._closing_reason.reply_code, 1431 self.obj._closing_reason.reply_text), 1432 (400, 'broker is having a bad day')) 1433 self.assertEqual(self.obj._state, self.obj.CLOSING) 1434 1435 self.obj._on_closeok( 1436 frame.Method(self.obj.channel_number, spec.Channel.CloseOk())) 1437 1438 # Verify this completes closing of the channel 1439 self.assertTrue(self.obj.is_closed, 1440 'Channel was not closed; state=%s' % 1441 (self.obj._state, )) 1442 1443 self.assertEqual(self.obj.callbacks.process.call_count, 2) 1444 1445 self.obj.callbacks.process.assert_any_call( 1446 self.obj.channel_number, '_on_channel_close', self.obj, self.obj, 1447 mock.ANY) 1448 1449 self.assertEqual(self.obj._cleanup.call_count, 1) 1450 1451 @mock.patch('logging.Logger.debug') 1452 def test_on_getempty(self, debug): 1453 method_frame = frame.Method(self.obj.channel_number, 1454 spec.Basic.GetEmpty) 1455 self.obj._on_getempty(method_frame) 1456 debug.assert_called_with('Received Basic.GetEmpty: %r', method_frame) 1457 1458 @mock.patch('logging.Logger.error') 1459 def test_on_getok_no_callback(self, error): 1460 method_value = frame.Method(1, spec.Basic.GetOk('ctag0', 1)) 1461 header_value = frame.Header(1, 10, spec.BasicProperties()) 1462 body_value = b'0123456789' 1463 self.obj._on_getok(method_value, header_value, body_value) 1464 error.assert_called_with( 1465 'Basic.GetOk received with no active callback') 1466 1467 def test_on_getok_callback_called(self): 1468 mock_callback = mock.Mock() 1469 self.obj._on_getok_callback = mock_callback 1470 method_value = frame.Method(1, spec.Basic.GetOk('ctag0', 1)) 1471 header_value = frame.Header(1, 10, spec.BasicProperties()) 1472 body_value = b'0123456789' 1473 self.obj._on_getok(method_value, header_value, body_value) 1474 mock_callback.assert_called_once_with( 1475 self.obj, method_value.method, header_value.properties, body_value) 1476 1477 def test_on_getok_callback_reset(self): 1478 mock_callback = mock.Mock() 1479 self.obj._on_getok_callback = mock_callback 1480 method_value = frame.Method(1, spec.Basic.GetOk('ctag0', 1)) 1481 header_value = frame.Header(1, 10, spec.BasicProperties()) 1482 body_value = b'0123456789' 1483 self.obj._on_getok(method_value, header_value, body_value) 1484 self.assertIsNone(self.obj._on_getok_callback) 1485 1486 @mock.patch('logging.Logger.debug') 1487 def test_on_confirm_selectok(self, debug): 1488 method_frame = frame.Method(self.obj.channel_number, 1489 spec.Confirm.SelectOk()) 1490 self.obj._on_selectok(method_frame) 1491 debug.assert_called_with('Confirm.SelectOk Received: %r', method_frame) 1492 1493 @mock.patch('logging.Logger.debug') 1494 def test_on_eventok(self, debug): 1495 method_frame = frame.Method(self.obj.channel_number, 1496 spec.Basic.GetEmpty()) 1497 self.obj._on_eventok(method_frame) 1498 debug.assert_called_with('Discarding frame %r', method_frame) 1499 1500 @mock.patch('logging.Logger.warning') 1501 def test_on_flow(self, warning): 1502 self.obj._has_on_flow_callback = False 1503 method_frame = frame.Method(self.obj.channel_number, 1504 spec.Channel.Flow()) 1505 self.obj._on_flow(method_frame) 1506 warning.assert_called_with('Channel.Flow received from server') 1507 1508 @mock.patch('logging.Logger.warning') 1509 def test_on_flow_with_callback(self, warning): 1510 method_frame = frame.Method(self.obj.channel_number, 1511 spec.Channel.Flow()) 1512 self.obj._on_flowok_callback = logging.debug 1513 self.obj._on_flow(method_frame) 1514 self.assertEqual(len(warning.call_args_list), 1) 1515 1516 @mock.patch('logging.Logger.warning') 1517 def test_on_flowok(self, warning): 1518 method_frame = frame.Method(self.obj.channel_number, 1519 spec.Channel.FlowOk()) 1520 self.obj._on_flowok(method_frame) 1521 warning.assert_called_with('Channel.FlowOk received with no active ' 1522 'callbacks') 1523 1524 def test_on_flowok_calls_callback(self): 1525 method_frame = frame.Method(self.obj.channel_number, 1526 spec.Channel.FlowOk()) 1527 mock_callback = mock.Mock() 1528 self.obj._on_flowok_callback = mock_callback 1529 self.obj._on_flowok(method_frame) 1530 mock_callback.assert_called_once_with(method_frame.method.active) 1531 1532 def test_on_flowok_callback_reset(self): 1533 method_frame = frame.Method(self.obj.channel_number, 1534 spec.Channel.FlowOk()) 1535 mock_callback = mock.Mock() 1536 self.obj._on_flowok_callback = mock_callback 1537 self.obj._on_flowok(method_frame) 1538 self.assertIsNone(self.obj._on_flowok_callback) 1539 1540 def test_on_openok_no_callback(self): 1541 self.obj._on_openok_callback = None 1542 method_value = frame.Method(1, spec.Channel.OpenOk()) 1543 self.obj._on_openok(method_value) 1544 self.assertEqual(self.obj._state, self.obj.OPEN) 1545 1546 def test_on_openok_callback_called(self): 1547 mock_callback = mock.Mock() 1548 self.obj._on_openok_callback = mock_callback 1549 method_value = frame.Method(1, spec.Channel.OpenOk()) 1550 self.obj._on_openok(method_value) 1551 mock_callback.assert_called_once_with(self.obj) 1552 1553 def test_onreturn(self): 1554 method_value = frame.Method(1, 1555 spec.Basic.Return(999, 'Reply Text', 1556 'exchange_value', 1557 'routing.key')) 1558 header_value = frame.Header(1, 10, spec.BasicProperties()) 1559 body_value = frame.Body(1, b'0123456789') 1560 self.obj._on_return(method_value, header_value, body_value) 1561 self.obj.callbacks.process.assert_called_with( 1562 self.obj.channel_number, '_on_return', self.obj, self.obj, 1563 method_value.method, header_value.properties, body_value) 1564 1565 @mock.patch('logging.Logger.warning') 1566 def test_onreturn_warning(self, warning): 1567 method_value = frame.Method(1, 1568 spec.Basic.Return(999, 'Reply Text', 1569 'exchange_value', 1570 'routing.key')) 1571 header_value = frame.Header(1, 10, spec.BasicProperties()) 1572 body_value = frame.Body(1, b'0123456789') 1573 self.obj.callbacks.process.return_value = False 1574 self.obj._on_return(method_value, header_value, body_value) 1575 warning.assert_called_with( 1576 'Basic.Return received from server (%r, %r)', method_value.method, 1577 header_value.properties) 1578 1579 @mock.patch('pika.channel.Channel._rpc') 1580 def test_on_synchronous_complete(self, rpc): 1581 mock_callback = mock.Mock() 1582 expectation = [ 1583 spec.Queue.Unbind(0, 'foo', 'bar', 'baz'), mock_callback, 1584 [spec.Queue.UnbindOk] 1585 ] 1586 self.obj._blocked = collections.deque([expectation]) 1587 self.obj._on_synchronous_complete( 1588 frame.Method(self.obj.channel_number, spec.Basic.Ack(1))) 1589 rpc.assert_called_once_with(*expectation) 1590 1591 def test_repr(self): 1592 text = repr(self.obj) 1593 self.assertTrue(text.startswith('<Channel'), text) 1594 1595 def test_rpc_raises_channel_wrong_state(self): 1596 self.assertRaises(exceptions.ChannelWrongStateError, self.obj._rpc, 1597 spec.Basic.Cancel('tag_abc')) 1598 1599 def test_rpc_while_blocking_appends_blocked_collection(self): 1600 self.obj._set_state(self.obj.OPEN) 1601 self.obj._blocking = spec.Confirm.Select() 1602 acceptable_replies = [(spec.Basic.CancelOk, { 1603 'consumer_tag': 'tag_abc' 1604 })] 1605 expectation = [ 1606 spec.Basic.Cancel('tag_abc'), lambda *args: None, 1607 acceptable_replies 1608 ] 1609 self.obj._rpc(*expectation) 1610 self.assertIn(expectation, self.obj._blocked) 1611 1612 def test_rpc_throws_value_error_with_unacceptable_replies(self): 1613 self.obj._set_state(self.obj.OPEN) 1614 self.assertRaises(TypeError, self.obj._rpc, 1615 spec.Basic.Cancel('tag_abc'), logging.debug, 'Foo') 1616 1617 def test_rpc_throws_type_error_with_invalid_callback(self): 1618 self.obj._set_state(self.obj.OPEN) 1619 self.assertRaises(TypeError, self.obj._rpc, spec.Channel.Open(1), 1620 ['foo'], [spec.Channel.OpenOk]) 1621 1622 def test_rpc_enters_blocking_and_adds_on_synchronous_complete(self): 1623 self.obj._set_state(self.obj.OPEN) 1624 method_frame = spec.Channel.Open() 1625 self.obj._rpc(method_frame, None, [spec.Channel.OpenOk]) 1626 self.assertEqual(self.obj._blocking, method_frame.NAME) 1627 self.obj.callbacks.add.assert_called_with( 1628 self.obj.channel_number, 1629 spec.Channel.OpenOk, 1630 self.obj._on_synchronous_complete, 1631 arguments=None) 1632 1633 def test_rpc_not_blocking_and_no_on_synchronous_complete_when_no_replies( 1634 self): 1635 self.obj._set_state(self.obj.OPEN) 1636 method_frame = spec.Channel.Open() 1637 self.obj._rpc(method_frame, None, acceptable_replies=[]) 1638 self.assertIsNone(self.obj._blocking) 1639 with self.assertRaises(AssertionError): 1640 self.obj.callbacks.add.assert_called_with( 1641 mock.ANY, 1642 mock.ANY, 1643 self.obj._on_synchronous_complete, 1644 arguments=mock.ANY) 1645 1646 def test_rpc_adds_callback(self): 1647 self.obj._set_state(self.obj.OPEN) 1648 method_frame = spec.Channel.Open() 1649 mock_callback = mock.Mock() 1650 self.obj._rpc(method_frame, mock_callback, [spec.Channel.OpenOk]) 1651 self.obj.callbacks.add.assert_called_with( 1652 self.obj.channel_number, 1653 spec.Channel.OpenOk, 1654 mock_callback, 1655 arguments=None) 1656 1657 def test_send_method(self): 1658 expectation = [2, 3] 1659 self.obj._send_method(*expectation) 1660 self.obj.connection._send_method.assert_called_once_with( 1661 *[self.obj.channel_number] + expectation) 1662 1663 def test_set_state(self): 1664 self.obj._state = channel.Channel.CLOSED 1665 self.obj._set_state(channel.Channel.OPENING) 1666 self.assertEqual(self.obj._state, channel.Channel.OPENING) 1667 1668 def test_raise_if_not_open_raises_channel_wrong_state(self): 1669 self.assertRaises(exceptions.ChannelWrongStateError, 1670 self.obj._raise_if_not_open) 1671 1672 def test_no_side_effects_from_send_method_error(self): 1673 self.obj._set_state(self.obj.OPEN) 1674 1675 self.assertIsNone(self.obj._blocking) 1676 1677 with mock.patch.object(self.obj.callbacks, 'add') as cb_add_mock: 1678 with mock.patch.object(self.obj, '_send_method', 1679 side_effect=TypeError) as send_method_mock: 1680 with self.assertRaises(TypeError): 1681 self.obj.queue_delete('', callback=lambda _frame: None) 1682 1683 self.assertEqual(send_method_mock.call_count, 1) 1684 self.assertIsNone(self.obj._blocking) 1685 self.assertEqual(cb_add_mock.call_count, 0) 1686