1from __future__ import absolute_import, unicode_literals 2 3import pytest 4import select 5import ssl 6import socket 7import sys 8import time 9import uuid 10 11from collections import OrderedDict 12 13try: 14 from collections.abc import Callable 15except ImportError: 16 from collections import Callable 17 18from itertools import count 19 20from case import Mock, call, patch, skip 21 22from kombu.five import Empty, range, monotonic 23from kombu.transport.qpid import (AuthenticationFailure, Channel, Connection, 24 ConnectionError, Message, NotFound, QoS, 25 Transport) 26from kombu.transport.virtual import Base64 27 28 29QPID_MODULE = 'kombu.transport.qpid' 30 31 32@pytest.fixture 33def disable_runtime_dependency_check(patching): 34 mock_dependency_is_none = patching(QPID_MODULE + '.dependency_is_none') 35 mock_dependency_is_none.return_value = False 36 return mock_dependency_is_none 37 38 39class QpidException(Exception): 40 """ 41 An object used to mock Exceptions provided by qpid.messaging.exceptions 42 """ 43 44 def __init__(self, code=None, text=None): 45 super(Exception, self).__init__(self) 46 self.code = code 47 self.text = text 48 49 50class BreakOutException(Exception): 51 pass 52 53 54@skip.if_python3() 55@skip.if_pypy() 56class test_QoS__init__(object): 57 58 def setup(self): 59 self.mock_session = Mock() 60 self.qos = QoS(self.mock_session) 61 62 def test__init__prefetch_default_set_correct_without_prefetch_value(self): 63 assert self.qos.prefetch_count == 1 64 65 def test__init__prefetch_is_hard_set_to_one(self): 66 qos_limit_two = QoS(self.mock_session) 67 assert qos_limit_two.prefetch_count == 1 68 69 def test__init___not_yet_acked_is_initialized(self): 70 assert isinstance(self.qos._not_yet_acked, OrderedDict) 71 72 73@skip.if_python3() 74@skip.if_pypy() 75class test_QoS_can_consume(object): 76 77 def setup(self): 78 session = Mock() 79 self.qos = QoS(session) 80 81 def test_True_when_prefetch_limit_is_zero(self): 82 self.qos.prefetch_count = 0 83 self.qos._not_yet_acked = [] 84 assert self.qos.can_consume() 85 86 def test_True_when_len_of__not_yet_acked_is_lt_prefetch_count(self): 87 self.qos.prefetch_count = 3 88 self.qos._not_yet_acked = ['a', 'b'] 89 assert self.qos.can_consume() 90 91 def test_False_when_len_of__not_yet_acked_is_eq_prefetch_count(self): 92 self.qos.prefetch_count = 3 93 self.qos._not_yet_acked = ['a', 'b', 'c'] 94 assert not self.qos.can_consume() 95 96 97@skip.if_python3() 98@skip.if_pypy() 99class test_QoS_can_consume_max_estimate(object): 100 101 def setup(self): 102 self.mock_session = Mock() 103 self.qos = QoS(self.mock_session) 104 105 def test_return_one_when_prefetch_count_eq_zero(self): 106 self.qos.prefetch_count = 0 107 assert self.qos.can_consume_max_estimate() == 1 108 109 def test_return_prefetch_count_sub_len__not_yet_acked(self): 110 self.qos._not_yet_acked = ['a', 'b'] 111 self.qos.prefetch_count = 4 112 assert self.qos.can_consume_max_estimate() == 2 113 114 115@skip.if_python3() 116@skip.if_pypy() 117class test_QoS_ack(object): 118 119 def setup(self): 120 self.mock_session = Mock() 121 self.qos = QoS(self.mock_session) 122 123 def test_ack_pops__not_yet_acked(self): 124 message = Mock() 125 self.qos.append(message, 1) 126 assert 1 in self.qos._not_yet_acked 127 self.qos.ack(1) 128 assert 1 not in self.qos._not_yet_acked 129 130 def test_ack_calls_session_acknowledge_with_message(self): 131 message = Mock() 132 self.qos.append(message, 1) 133 self.qos.ack(1) 134 self.qos.session.acknowledge.assert_called_with(message=message) 135 136 137@skip.if_python3() 138@skip.if_pypy() 139class test_QoS_reject(object): 140 141 @pytest.fixture(autouse=True) 142 def setup_qpid(self, patching): 143 self.mock_qpid = patching(QPID_MODULE + '.qpid') 144 self.mock_Disposition = self.mock_qpid.messaging.Disposition 145 self.mock_RELEASED = self.mock_qpid.messaging.RELEASED 146 self.mock_REJECTED = self.mock_qpid.messaging.REJECTED 147 148 def setup(self): 149 self.mock_session = Mock() 150 self.mock_message = Mock() 151 self.qos = QoS(self.mock_session) 152 153 def test_reject_pops__not_yet_acked(self): 154 self.qos.append(self.mock_message, 1) 155 assert 1 in self.qos._not_yet_acked 156 self.qos.reject(1) 157 assert 1 not in self.qos._not_yet_acked 158 159 def test_reject_requeue_true(self): 160 self.qos.append(self.mock_message, 1) 161 self.qos.reject(1, requeue=True) 162 self.mock_Disposition.assert_called_with(self.mock_RELEASED) 163 self.qos.session.acknowledge.assert_called_with( 164 message=self.mock_message, 165 disposition=self.mock_Disposition.return_value, 166 ) 167 168 def test_reject_requeue_false(self): 169 message = Mock() 170 self.qos.append(message, 1) 171 self.qos.reject(1, requeue=False) 172 self.mock_Disposition.assert_called_with(self.mock_REJECTED) 173 self.qos.session.acknowledge.assert_called_with( 174 message=message, disposition=self.mock_Disposition.return_value, 175 ) 176 177 178@skip.if_python3() 179@skip.if_pypy() 180class test_QoS(object): 181 182 def mock_message_factory(self): 183 """Create and return a mock message tag and delivery_tag.""" 184 m_delivery_tag = self.delivery_tag_generator.next() 185 m = 'message %s' % (m_delivery_tag, ) 186 return m, m_delivery_tag 187 188 def add_n_messages_to_qos(self, n, qos): 189 """Add N mock messages into the passed in qos object""" 190 for i in range(n): 191 self.add_message_to_qos(qos) 192 193 def add_message_to_qos(self, qos): 194 """Add a single mock message into the passed in qos object. 195 196 Uses the mock_message_factory() to create the message and 197 delivery_tag. 198 """ 199 m, m_delivery_tag = self.mock_message_factory() 200 qos.append(m, m_delivery_tag) 201 202 def setup(self): 203 self.mock_session = Mock() 204 self.qos_no_limit = QoS(self.mock_session) 205 self.qos_limit_2 = QoS(self.mock_session, prefetch_count=2) 206 self.delivery_tag_generator = count(1) 207 208 def test_append(self): 209 """Append two messages and check inside the QoS object that they 210 were put into the internal data structures correctly 211 """ 212 qos = self.qos_no_limit 213 m1, m1_tag = self.mock_message_factory() 214 m2, m2_tag = self.mock_message_factory() 215 qos.append(m1, m1_tag) 216 length_not_yet_acked = len(qos._not_yet_acked) 217 assert length_not_yet_acked == 1 218 checked_message1 = qos._not_yet_acked[m1_tag] 219 assert m1 is checked_message1 220 qos.append(m2, m2_tag) 221 length_not_yet_acked = len(qos._not_yet_acked) 222 assert length_not_yet_acked == 2 223 checked_message2 = qos._not_yet_acked[m2_tag] 224 assert m2 is checked_message2 225 226 def test_get(self): 227 """Append two messages, and use get to receive them""" 228 qos = self.qos_no_limit 229 m1, m1_tag = self.mock_message_factory() 230 m2, m2_tag = self.mock_message_factory() 231 qos.append(m1, m1_tag) 232 qos.append(m2, m2_tag) 233 message1 = qos.get(m1_tag) 234 message2 = qos.get(m2_tag) 235 assert m1 is message1 236 assert m2 is message2 237 238 239@skip.if_python3() 240@skip.if_pypy() 241class ConnectionTestBase(object): 242 243 @patch(QPID_MODULE + '.qpid') 244 def setup(self, mock_qpid): 245 self.connection_options = { 246 'host': 'localhost', 247 'port': 5672, 248 'transport': 'tcp', 249 'timeout': 10, 250 'sasl_mechanisms': 'ANONYMOUS', 251 } 252 self.mock_qpid_connection = mock_qpid.messaging.Connection 253 self.conn = Connection(**self.connection_options) 254 255 256@skip.if_python3() 257@skip.if_pypy() 258class test_Connection__init__(ConnectionTestBase): 259 260 def test_stores_connection_options(self): 261 # ensure that only one mech was passed into connection. The other 262 # options should all be passed through as-is 263 modified_conn_opts = self.connection_options 264 assert modified_conn_opts == self.conn.connection_options 265 266 def test_class_variables(self): 267 assert isinstance(self.conn.channels, list) 268 assert isinstance(self.conn._callbacks, dict) 269 270 def test_establishes_connection(self): 271 modified_conn_opts = self.connection_options 272 self.mock_qpid_connection.establish.assert_called_with( 273 **modified_conn_opts 274 ) 275 276 def test_saves_established_connection(self): 277 created_conn = self.mock_qpid_connection.establish.return_value 278 assert self.conn._qpid_conn is created_conn 279 280 @patch(QPID_MODULE + '.ConnectionError', new=(QpidException, )) 281 @patch(QPID_MODULE + '.sys.exc_info') 282 @patch(QPID_MODULE + '.qpid') 283 def test_mutates_ConnError_by_message(self, mock_qpid, mock_exc_info): 284 text = 'connection-forced: Authentication failed(320)' 285 my_conn_error = QpidException(text=text) 286 mock_qpid.messaging.Connection.establish.side_effect = my_conn_error 287 mock_exc_info.return_value = 'a', 'b', None 288 try: 289 self.conn = Connection(**self.connection_options) 290 except AuthenticationFailure as error: 291 exc_info = sys.exc_info() 292 assert not isinstance(error, QpidException) 293 assert exc_info[1] == 'b' 294 assert exc_info[2] is None 295 else: 296 self.fail('ConnectionError type was not mutated correctly') 297 298 @patch(QPID_MODULE + '.ConnectionError', new=(QpidException, )) 299 @patch(QPID_MODULE + '.sys.exc_info') 300 @patch(QPID_MODULE + '.qpid') 301 def test_mutates_ConnError_by_code(self, mock_qpid, mock_exc_info): 302 my_conn_error = QpidException(code=320, text='someothertext') 303 mock_qpid.messaging.Connection.establish.side_effect = my_conn_error 304 mock_exc_info.return_value = 'a', 'b', None 305 try: 306 self.conn = Connection(**self.connection_options) 307 except AuthenticationFailure as error: 308 exc_info = sys.exc_info() 309 assert not isinstance(error, QpidException) 310 assert exc_info[1] == 'b' 311 assert exc_info[2] is None 312 else: 313 self.fail('ConnectionError type was not mutated correctly') 314 315 @patch(QPID_MODULE + '.ConnectionError', new=(QpidException, )) 316 @patch(QPID_MODULE + '.sys.exc_info') 317 @patch(QPID_MODULE + '.qpid') 318 def test_connection__init__mutates_ConnError_by_message2(self, mock_qpid, 319 mock_exc_info): 320 """ 321 Test for PLAIN connection via python-saslwrapper, sans cyrus-sasl-plain 322 323 This test is specific for what is returned when we attempt to connect 324 with PLAIN mech and python-saslwrapper is installed, but 325 cyrus-sasl-plain is not installed. 326 """ 327 my_conn_error = QpidException() 328 my_conn_error.text = 'Error in sasl_client_start (-4) SASL(-4): no '\ 329 'mechanism available' 330 mock_qpid.messaging.Connection.establish.side_effect = my_conn_error 331 mock_exc_info.return_value = ('a', 'b', None) 332 try: 333 self.conn = Connection(**self.connection_options) 334 except AuthenticationFailure as error: 335 exc_info = sys.exc_info() 336 assert not isinstance(error, QpidException) 337 assert exc_info[1] == 'b' 338 assert exc_info[2] is None 339 else: 340 self.fail('ConnectionError type was not mutated correctly') 341 342 @patch(QPID_MODULE + '.ConnectionError', new=(QpidException, )) 343 @patch(QPID_MODULE + '.sys.exc_info') 344 @patch(QPID_MODULE + '.qpid') 345 def test_unknown_connection_error(self, mock_qpid, mock_exc_info): 346 # If we get a connection error that we don't understand, 347 # bubble it up as-is 348 my_conn_error = QpidException(code=999, text='someothertext') 349 mock_qpid.messaging.Connection.establish.side_effect = my_conn_error 350 mock_exc_info.return_value = 'a', 'b', None 351 try: 352 self.conn = Connection(**self.connection_options) 353 except Exception as error: 354 assert error.code == 999 355 else: 356 self.fail('Connection should have thrown an exception') 357 358 @patch.object(Transport, 'channel_errors', new=(QpidException, )) 359 @patch(QPID_MODULE + '.qpid') 360 @patch(QPID_MODULE + '.ConnectionError', new=IOError) 361 def test_non_qpid_error_raises(self, mock_qpid): 362 mock_Qpid_Connection = mock_qpid.messaging.Connection 363 my_conn_error = SyntaxError() 364 my_conn_error.text = 'some non auth related error message' 365 mock_Qpid_Connection.establish.side_effect = my_conn_error 366 with pytest.raises(SyntaxError): 367 Connection(**self.connection_options) 368 369 @patch(QPID_MODULE + '.qpid') 370 @patch(QPID_MODULE + '.ConnectionError', new=IOError) 371 def test_non_auth_conn_error_raises(self, mock_qpid): 372 mock_Qpid_Connection = mock_qpid.messaging.Connection 373 my_conn_error = IOError() 374 my_conn_error.text = 'some non auth related error message' 375 mock_Qpid_Connection.establish.side_effect = my_conn_error 376 with pytest.raises(IOError): 377 Connection(**self.connection_options) 378 379 380@skip.if_python3() 381@skip.if_pypy() 382class test_Connection_class_attributes(ConnectionTestBase): 383 384 def test_connection_verify_class_attributes(self): 385 assert Channel == Connection.Channel 386 387 388@skip.if_python3() 389@skip.if_pypy() 390class test_Connection_get_Qpid_connection(ConnectionTestBase): 391 392 def test_connection_get_qpid_connection(self): 393 self.conn._qpid_conn = Mock() 394 returned_connection = self.conn.get_qpid_connection() 395 assert self.conn._qpid_conn is returned_connection 396 397 398@skip.if_python3() 399@skip.if_pypy() 400class test_Connection_close(ConnectionTestBase): 401 402 def test_connection_close(self): 403 self.conn._qpid_conn = Mock() 404 self.conn.close() 405 self.conn._qpid_conn.close.assert_called_once_with() 406 407 408@skip.if_python3() 409@skip.if_pypy() 410class test_Connection_close_channel(ConnectionTestBase): 411 412 def setup(self): 413 super(test_Connection_close_channel, self).setup() 414 self.conn.channels = Mock() 415 416 def test_connection_close_channel_removes_channel_from_channel_list(self): 417 mock_channel = Mock() 418 self.conn.close_channel(mock_channel) 419 self.conn.channels.remove.assert_called_once_with(mock_channel) 420 421 def test_connection_close_channel_handles_ValueError_being_raised(self): 422 self.conn.channels.remove = Mock(side_effect=ValueError()) 423 self.conn.close_channel(Mock()) 424 425 def test_connection_close_channel_set_channel_connection_to_None(self): 426 mock_channel = Mock() 427 mock_channel.connection = False 428 self.conn.channels.remove = Mock(side_effect=ValueError()) 429 self.conn.close_channel(mock_channel) 430 assert mock_channel.connection is None 431 432 433@skip.if_python3() 434@skip.if_pypy() 435class ChannelTestBase(object): 436 437 @pytest.fixture(autouse=True) 438 def setup_channel(self, patching): 439 self.mock_qpidtoollibs = patching(QPID_MODULE + '.qpidtoollibs') 440 self.mock_broker_agent = self.mock_qpidtoollibs.BrokerAgent 441 self.conn = Mock() 442 self.transport = Mock() 443 self.channel = Channel(self.conn, self.transport) 444 445 446@skip.if_python3() 447@skip.if_pypy() 448class test_Channel_purge(ChannelTestBase): 449 450 def setup(self): 451 self.mock_queue = Mock() 452 453 def test_gets_queue(self): 454 self.channel._purge(self.mock_queue) 455 getQueue = self.mock_broker_agent.return_value.getQueue 456 getQueue.assert_called_once_with(self.mock_queue) 457 458 def test_does_not_call_purge_if_message_count_is_zero(self): 459 values = {'msgDepth': 0} 460 queue_obj = self.mock_broker_agent.return_value.getQueue.return_value 461 queue_obj.values = values 462 self.channel._purge(self.mock_queue) 463 assert not queue_obj.purge.called 464 465 def test_purges_all_messages_from_queue(self): 466 values = {'msgDepth': 5} 467 queue_obj = self.mock_broker_agent.return_value.getQueue.return_value 468 queue_obj.values = values 469 self.channel._purge(self.mock_queue) 470 queue_obj.purge.assert_called_with(5) 471 472 def test_returns_message_count(self): 473 values = {'msgDepth': 5} 474 queue_obj = self.mock_broker_agent.return_value.getQueue.return_value 475 queue_obj.values = values 476 result = self.channel._purge(self.mock_queue) 477 assert result == 5 478 479 @patch(QPID_MODULE + '.NotFound', new=QpidException) 480 def test_raises_channel_error_if_queue_does_not_exist(self): 481 self.mock_broker_agent.return_value.getQueue.return_value = None 482 with pytest.raises(QpidException): 483 self.channel._purge(self.mock_queue) 484 485 486@skip.if_python3() 487@skip.if_pypy() 488class test_Channel_put(ChannelTestBase): 489 490 @patch(QPID_MODULE + '.qpid') 491 def test_channel__put_onto_queue(self, mock_qpid): 492 routing_key = 'routingkey' 493 mock_message = Mock() 494 mock_Message_cls = mock_qpid.messaging.Message 495 496 self.channel._put(routing_key, mock_message) 497 498 address_str = '{0}; {{assert: always, node: {{type: queue}}}}'.format( 499 routing_key, 500 ) 501 self.transport.session.sender.assert_called_with(address_str) 502 mock_Message_cls.assert_called_with( 503 content=mock_message, subject=None, durable=True 504 ) 505 mock_sender = self.transport.session.sender.return_value 506 mock_sender.send.assert_called_with( 507 mock_Message_cls.return_value, sync=True, 508 ) 509 mock_sender.close.assert_called_with() 510 511 @patch(QPID_MODULE + '.qpid') 512 def test_channel__put_onto_exchange(self, mock_qpid): 513 mock_routing_key = 'routingkey' 514 mock_exchange_name = 'myexchange' 515 mock_message = Mock() 516 mock_Message_cls = mock_qpid.messaging.Message 517 518 self.channel._put(mock_routing_key, mock_message, mock_exchange_name) 519 520 addrstr = '{0}/{1}; {{assert: always, node: {{type: topic}}}}'.format( 521 mock_exchange_name, mock_routing_key, 522 ) 523 self.transport.session.sender.assert_called_with(addrstr) 524 mock_Message_cls.assert_called_with( 525 content=mock_message, subject=mock_routing_key, durable=True 526 ) 527 mock_sender = self.transport.session.sender.return_value 528 mock_sender.send.assert_called_with( 529 mock_Message_cls.return_value, sync=True, 530 ) 531 mock_sender.close.assert_called_with() 532 533 534@skip.if_python3() 535@skip.if_pypy() 536class test_Channel_get(ChannelTestBase): 537 538 def test_channel__get(self): 539 mock_queue = Mock() 540 541 result = self.channel._get(mock_queue) 542 543 self.transport.session.receiver.assert_called_once_with(mock_queue) 544 mock_rx = self.transport.session.receiver.return_value 545 mock_rx.fetch.assert_called_once_with(timeout=0) 546 mock_rx.close.assert_called_once_with() 547 assert mock_rx.fetch.return_value is result 548 549 550@skip.if_python3() 551@skip.if_pypy() 552class test_Channel_close(ChannelTestBase): 553 554 @pytest.fixture(autouse=True) 555 def setup_basic_cancel(self, patching, setup_channel): 556 self.mock_basic_cancel = patching.object(self.channel, 'basic_cancel') 557 self.channel.closed = False 558 559 @pytest.fixture(autouse=True) 560 def setup_receivers(self, setup_channel): 561 self.mock_receiver1 = Mock() 562 self.mock_receiver2 = Mock() 563 self.channel._receivers = { 564 1: self.mock_receiver1, 2: self.mock_receiver2, 565 } 566 567 def test_channel_close_sets_close_attribute(self): 568 self.channel.close() 569 assert self.channel.closed 570 571 def test_channel_close_calls_basic_cancel_on_all_receivers(self): 572 self.channel.close() 573 self.mock_basic_cancel.assert_has_calls([call(1), call(2)]) 574 575 def test_channel_close_calls_close_channel_on_connection(self): 576 self.channel.close() 577 self.conn.close_channel.assert_called_once_with(self.channel) 578 579 def test_channel_close_calls_close_on_broker_agent(self): 580 self.channel.close() 581 self.channel._broker.close.assert_called_once_with() 582 583 def test_channel_close_does_nothing_if_already_closed(self): 584 self.channel.closed = True 585 self.channel.close() 586 self.mock_basic_cancel.assert_not_called() 587 588 def test_channel_close_does_not_call_close_channel_if_conn_is_None(self): 589 self.channel.connection = None 590 self.channel.close() 591 self.conn.close_channel.assert_not_called() 592 593 594@skip.if_python3() 595@skip.if_pypy() 596class test_Channel_basic_qos(ChannelTestBase): 597 598 def test_channel_basic_qos_always_returns_one(self): 599 self.channel.basic_qos(2) 600 assert self.channel.qos.prefetch_count == 1 601 602 603@skip.if_python3() 604@skip.if_pypy() 605class test_Channel_basic_get(ChannelTestBase): 606 607 @pytest.fixture(autouse=True) 608 def setup_channel_attributes(self, setup_channel): 609 self.channel.Message = Mock() 610 self.channel._get = Mock() 611 612 def test_channel_basic_get_calls__get_with_queue(self): 613 mock_queue = Mock() 614 self.channel.basic_get(mock_queue) 615 self.channel._get.assert_called_once_with(mock_queue) 616 617 def test_channel_basic_get_creates_Message_correctly(self): 618 mock_queue = Mock() 619 self.channel.basic_get(mock_queue) 620 mock_raw_message = self.channel._get.return_value.content 621 self.channel.Message.assert_called_once_with( 622 mock_raw_message, channel=self.channel, 623 ) 624 625 def test_channel_basic_get_acknowledges_message_by_default(self): 626 mock_queue = Mock() 627 self.channel.basic_get(mock_queue) 628 mock_qpid_message = self.channel._get.return_value 629 acknowledge = self.transport.session.acknowledge 630 acknowledge.assert_called_once_with(message=mock_qpid_message) 631 632 def test_channel_basic_get_acknowledges_message_with_no_ack_False(self): 633 mock_queue = Mock() 634 self.channel.basic_get(mock_queue, no_ack=False) 635 mock_qpid_message = self.channel._get.return_value 636 acknowledge = self.transport.session.acknowledge 637 acknowledge.assert_called_once_with(message=mock_qpid_message) 638 639 def test_channel_basic_get_acknowledges_message_with_no_ack_True(self): 640 mock_queue = Mock() 641 self.channel.basic_get(mock_queue, no_ack=True) 642 mock_qpid_message = self.channel._get.return_value 643 acknowledge = self.transport.session.acknowledge 644 acknowledge.assert_called_once_with(message=mock_qpid_message) 645 646 def test_channel_basic_get_returns_correct_message(self): 647 mock_queue = Mock() 648 basic_get_result = self.channel.basic_get(mock_queue) 649 expected_message = self.channel.Message.return_value 650 assert expected_message is basic_get_result 651 652 def test_basic_get_returns_None_when_channel__get_raises_Empty(self): 653 mock_queue = Mock() 654 self.channel._get = Mock(side_effect=Empty) 655 basic_get_result = self.channel.basic_get(mock_queue) 656 assert self.channel.Message.call_count == 0 657 assert basic_get_result is None 658 659 660@skip.if_python3() 661@skip.if_pypy() 662class test_Channel_basic_cancel(ChannelTestBase): 663 664 @pytest.fixture(autouse=True) 665 def setup_receivers(self, setup_channel): 666 self.channel._receivers = {1: Mock()} 667 668 def test_channel_basic_cancel_no_error_if_consumer_tag_not_found(self): 669 self.channel.basic_cancel(2) 670 671 def test_channel_basic_cancel_pops_receiver(self): 672 self.channel.basic_cancel(1) 673 assert 1 not in self.channel._receivers 674 675 def test_channel_basic_cancel_closes_receiver(self): 676 mock_receiver = self.channel._receivers[1] 677 self.channel.basic_cancel(1) 678 mock_receiver.close.assert_called_once_with() 679 680 def test_channel_basic_cancel_pops__tag_to_queue(self): 681 self.channel._tag_to_queue = Mock() 682 self.channel.basic_cancel(1) 683 self.channel._tag_to_queue.pop.assert_called_once_with(1, None) 684 685 def test_channel_basic_cancel_pops_connection__callbacks(self): 686 self.channel._tag_to_queue = Mock() 687 self.channel.basic_cancel(1) 688 mock_queue = self.channel._tag_to_queue.pop.return_value 689 self.conn._callbacks.pop.assert_called_once_with(mock_queue, None) 690 691 692@skip.if_python3() 693@skip.if_pypy() 694class test_Channel__init__(ChannelTestBase): 695 696 def test_channel___init__sets_variables_as_expected(self): 697 assert self.conn is self.channel.connection 698 assert self.transport is self.channel.transport 699 assert not self.channel.closed 700 self.conn.get_qpid_connection.assert_called_once_with() 701 expected_broker_agent = self.mock_broker_agent.return_value 702 assert self.channel._broker is expected_broker_agent 703 assert self.channel._tag_to_queue == {} 704 assert self.channel._receivers == {} 705 assert self.channel._qos is None 706 707 708@skip.if_python3() 709@skip.if_pypy() 710class test_Channel_basic_consume(ChannelTestBase): 711 712 @pytest.fixture(autouse=True) 713 def setup_callbacks(self, setup_channel): 714 self.conn._callbacks = {} 715 716 def test_channel_basic_consume_adds_queue_to__tag_to_queue(self): 717 mock_tag = Mock() 718 mock_queue = Mock() 719 self.channel.basic_consume(mock_queue, Mock(), Mock(), mock_tag) 720 expected_dict = {mock_tag: mock_queue} 721 assert expected_dict == self.channel._tag_to_queue 722 723 def test_channel_basic_consume_adds_entry_to_connection__callbacks(self): 724 mock_queue = Mock() 725 self.channel.basic_consume(mock_queue, Mock(), Mock(), Mock()) 726 assert mock_queue in self.conn._callbacks 727 assert isinstance(self.conn._callbacks[mock_queue], Callable) 728 729 def test_channel_basic_consume_creates_new_receiver(self): 730 mock_queue = Mock() 731 self.channel.basic_consume(mock_queue, Mock(), Mock(), Mock()) 732 self.transport.session.receiver.assert_called_once_with(mock_queue) 733 734 def test_channel_basic_consume_saves_new_receiver(self): 735 mock_tag = Mock() 736 self.channel.basic_consume(Mock(), Mock(), Mock(), mock_tag) 737 new_mock_receiver = self.transport.session.receiver.return_value 738 expected_dict = {mock_tag: new_mock_receiver} 739 assert expected_dict == self.channel._receivers 740 741 def test_channel_basic_consume_sets_capacity_on_new_receiver(self): 742 mock_prefetch_count = Mock() 743 self.channel.qos.prefetch_count = mock_prefetch_count 744 self.channel.basic_consume(Mock(), Mock(), Mock(), Mock()) 745 new_receiver = self.transport.session.receiver.return_value 746 assert new_receiver.capacity is mock_prefetch_count 747 748 def get_callback(self, no_ack=Mock(), original_cb=Mock()): 749 self.channel.Message = Mock() 750 mock_queue = Mock() 751 self.channel.basic_consume(mock_queue, no_ack, original_cb, Mock()) 752 return self.conn._callbacks[mock_queue] 753 754 def test_channel_basic_consume_callback_creates_Message_correctly(self): 755 callback = self.get_callback() 756 mock_qpid_message = Mock() 757 callback(mock_qpid_message) 758 mock_content = mock_qpid_message.content 759 self.channel.Message.assert_called_once_with( 760 mock_content, channel=self.channel, 761 ) 762 763 def test_channel_basic_consume_callback_adds_message_to_QoS(self): 764 self.channel._qos = Mock() 765 callback = self.get_callback() 766 mock_qpid_message = Mock() 767 callback(mock_qpid_message) 768 mock_delivery_tag = self.channel.Message.return_value.delivery_tag 769 self.channel._qos.append.assert_called_once_with( 770 mock_qpid_message, mock_delivery_tag, 771 ) 772 773 def test_channel_basic_consume_callback_gratuitously_acks(self): 774 self.channel.basic_ack = Mock() 775 callback = self.get_callback() 776 mock_qpid_message = Mock() 777 callback(mock_qpid_message) 778 mock_delivery_tag = self.channel.Message.return_value.delivery_tag 779 self.channel.basic_ack.assert_called_once_with(mock_delivery_tag) 780 781 def test_channel_basic_consume_callback_does_not_ack_when_needed(self): 782 self.channel.basic_ack = Mock() 783 callback = self.get_callback(no_ack=False) 784 mock_qpid_message = Mock() 785 callback(mock_qpid_message) 786 self.channel.basic_ack.assert_not_called() 787 788 def test_channel_basic_consume_callback_calls_real_callback(self): 789 self.channel.basic_ack = Mock() 790 mock_original_callback = Mock() 791 callback = self.get_callback(original_cb=mock_original_callback) 792 mock_qpid_message = Mock() 793 callback(mock_qpid_message) 794 expected_message = self.channel.Message.return_value 795 mock_original_callback.assert_called_once_with(expected_message) 796 797 798@skip.if_python3() 799@skip.if_pypy() 800class test_Channel_queue_delete(ChannelTestBase): 801 802 @pytest.fixture(autouse=True) 803 def setup_channel_patches(self, patching, setup_channel): 804 self.mock__has_queue = patching.object(self.channel, '_has_queue') 805 self.mock__size = patching.object(self.channel, '_size') 806 self.mock__delete = patching.object(self.channel, '_delete') 807 self.mock_queue = Mock() 808 809 def test_checks_if_queue_exists(self): 810 self.channel.queue_delete(self.mock_queue) 811 self.mock__has_queue.assert_called_once_with(self.mock_queue) 812 813 def test_does_nothing_if_queue_does_not_exist(self): 814 self.mock__has_queue.return_value = False 815 self.channel.queue_delete(self.mock_queue) 816 self.mock__delete.assert_not_called() 817 818 def test_not_empty_and_if_empty_True_no_delete(self): 819 self.mock__size.return_value = 1 820 self.channel.queue_delete(self.mock_queue, if_empty=True) 821 mock_broker = self.mock_broker_agent.return_value 822 mock_broker.getQueue.assert_not_called() 823 824 def test_calls_get_queue(self): 825 self.channel.queue_delete(self.mock_queue) 826 getQueue = self.mock_broker_agent.return_value.getQueue 827 getQueue.assert_called_once_with(self.mock_queue) 828 829 def test_gets_queue_attribute(self): 830 self.channel.queue_delete(self.mock_queue) 831 queue_obj = self.mock_broker_agent.return_value.getQueue.return_value 832 queue_obj.getAttributes.assert_called_once_with() 833 834 def test_queue_in_use_and_if_unused_no_delete(self): 835 queue_obj = self.mock_broker_agent.return_value.getQueue.return_value 836 queue_obj.getAttributes.return_value = {'consumerCount': 1} 837 self.channel.queue_delete(self.mock_queue, if_unused=True) 838 self.mock__delete.assert_not_called() 839 840 def test_calls__delete_with_queue(self): 841 self.channel.queue_delete(self.mock_queue) 842 self.mock__delete.assert_called_once_with(self.mock_queue) 843 844 845@skip.if_python3() 846@skip.if_pypy() 847class test_Channel(object): 848 849 @patch(QPID_MODULE + '.qpidtoollibs') 850 def setup(self, mock_qpidtoollibs): 851 self.mock_connection = Mock() 852 self.mock_qpid_connection = Mock() 853 self.mock_qpid_session = Mock() 854 self.mock_qpid_connection.session = Mock( 855 return_value=self.mock_qpid_session, 856 ) 857 self.mock_connection.get_qpid_connection = Mock( 858 return_value=self.mock_qpid_connection, 859 ) 860 self.mock_transport = Mock() 861 self.mock_broker = Mock() 862 self.mock_Message = Mock() 863 self.mock_BrokerAgent = mock_qpidtoollibs.BrokerAgent 864 self.mock_BrokerAgent.return_value = self.mock_broker 865 self.my_channel = Channel( 866 self.mock_connection, self.mock_transport, 867 ) 868 self.my_channel.Message = self.mock_Message 869 870 def test_verify_QoS_class_attribute(self): 871 """Verify that the class attribute QoS refers to the QoS object""" 872 assert QoS is Channel.QoS 873 874 def test_verify_Message_class_attribute(self): 875 """Verify that the class attribute Message refers to the Message 876 object.""" 877 assert Message is Channel.Message 878 879 def test_body_encoding_class_attribute(self): 880 """Verify that the class attribute body_encoding is set to base64""" 881 assert Channel.body_encoding == 'base64' 882 883 def test_codecs_class_attribute(self): 884 """Verify that the codecs class attribute has a correct key and 885 value.""" 886 assert isinstance(Channel.codecs, dict) 887 assert 'base64' in Channel.codecs 888 assert isinstance(Channel.codecs['base64'], Base64) 889 890 def test_size(self): 891 """Test getting the number of messages in a queue specified by 892 name and returning them.""" 893 message_count = 5 894 mock_queue = Mock() 895 mock_queue_to_check = Mock() 896 mock_queue_to_check.values = {'msgDepth': message_count} 897 self.mock_broker.getQueue.return_value = mock_queue_to_check 898 result = self.my_channel._size(mock_queue) 899 self.mock_broker.getQueue.assert_called_with(mock_queue) 900 assert message_count == result 901 902 def test_delete(self): 903 """Test deleting a queue calls purge and delQueue with queue name.""" 904 mock_queue = Mock() 905 self.my_channel._purge = Mock() 906 result = self.my_channel._delete(mock_queue) 907 self.my_channel._purge.assert_called_with(mock_queue) 908 self.mock_broker.delQueue.assert_called_with(mock_queue) 909 assert result is None 910 911 def test_has_queue_true(self): 912 """Test checking if a queue exists, and it does.""" 913 mock_queue = Mock() 914 self.mock_broker.getQueue.return_value = True 915 result = self.my_channel._has_queue(mock_queue) 916 assert result 917 918 def test_has_queue_false(self): 919 """Test checking if a queue exists, and it does not.""" 920 mock_queue = Mock() 921 self.mock_broker.getQueue.return_value = False 922 result = self.my_channel._has_queue(mock_queue) 923 assert not result 924 925 @patch('amqp.protocol.queue_declare_ok_t') 926 def test_queue_declare_with_exception_raised(self, 927 mock_queue_declare_ok_t): 928 """Test declare_queue, where an exception is raised and silenced.""" 929 mock_queue = Mock() 930 mock_passive = Mock() 931 mock_durable = Mock() 932 mock_exclusive = Mock() 933 mock_auto_delete = Mock() 934 mock_nowait = Mock() 935 mock_arguments = Mock() 936 mock_msg_count = Mock() 937 mock_queue.startswith.return_value = False 938 mock_queue.endswith.return_value = False 939 options = { 940 'passive': mock_passive, 941 'durable': mock_durable, 942 'exclusive': mock_exclusive, 943 'auto-delete': mock_auto_delete, 944 'arguments': mock_arguments, 945 } 946 mock_consumer_count = Mock() 947 mock_return_value = Mock() 948 values_dict = { 949 'msgDepth': mock_msg_count, 950 'consumerCount': mock_consumer_count, 951 } 952 mock_queue_data = Mock() 953 mock_queue_data.values = values_dict 954 exception_to_raise = Exception('The foo object already exists.') 955 self.mock_broker.addQueue.side_effect = exception_to_raise 956 self.mock_broker.getQueue.return_value = mock_queue_data 957 mock_queue_declare_ok_t.return_value = mock_return_value 958 result = self.my_channel.queue_declare( 959 mock_queue, 960 passive=mock_passive, 961 durable=mock_durable, 962 exclusive=mock_exclusive, 963 auto_delete=mock_auto_delete, 964 nowait=mock_nowait, 965 arguments=mock_arguments, 966 ) 967 self.mock_broker.addQueue.assert_called_with( 968 mock_queue, options=options, 969 ) 970 mock_queue_declare_ok_t.assert_called_with( 971 mock_queue, mock_msg_count, mock_consumer_count, 972 ) 973 assert mock_return_value is result 974 975 def test_queue_declare_set_ring_policy_for_celeryev(self): 976 """Test declare_queue sets ring_policy for celeryev.""" 977 mock_queue = Mock() 978 mock_queue.startswith.return_value = True 979 mock_queue.endswith.return_value = False 980 expected_default_options = { 981 'passive': False, 982 'durable': False, 983 'exclusive': False, 984 'auto-delete': True, 985 'arguments': None, 986 'qpid.policy_type': 'ring', 987 } 988 mock_msg_count = Mock() 989 mock_consumer_count = Mock() 990 values_dict = { 991 'msgDepth': mock_msg_count, 992 'consumerCount': mock_consumer_count, 993 } 994 mock_queue_data = Mock() 995 mock_queue_data.values = values_dict 996 self.mock_broker.addQueue.return_value = None 997 self.mock_broker.getQueue.return_value = mock_queue_data 998 self.my_channel.queue_declare(mock_queue) 999 mock_queue.startswith.assert_called_with('celeryev') 1000 self.mock_broker.addQueue.assert_called_with( 1001 mock_queue, options=expected_default_options, 1002 ) 1003 1004 def test_queue_declare_set_ring_policy_for_pidbox(self): 1005 """Test declare_queue sets ring_policy for pidbox.""" 1006 mock_queue = Mock() 1007 mock_queue.startswith.return_value = False 1008 mock_queue.endswith.return_value = True 1009 expected_default_options = { 1010 'passive': False, 1011 'durable': False, 1012 'exclusive': False, 1013 'auto-delete': True, 1014 'arguments': None, 1015 'qpid.policy_type': 'ring', 1016 } 1017 mock_msg_count = Mock() 1018 mock_consumer_count = Mock() 1019 values_dict = { 1020 'msgDepth': mock_msg_count, 1021 'consumerCount': mock_consumer_count, 1022 } 1023 mock_queue_data = Mock() 1024 mock_queue_data.values = values_dict 1025 self.mock_broker.addQueue.return_value = None 1026 self.mock_broker.getQueue.return_value = mock_queue_data 1027 self.my_channel.queue_declare(mock_queue) 1028 mock_queue.endswith.assert_called_with('pidbox') 1029 self.mock_broker.addQueue.assert_called_with( 1030 mock_queue, options=expected_default_options, 1031 ) 1032 1033 def test_queue_declare_ring_policy_not_set_as_expected(self): 1034 """Test declare_queue does not set ring_policy as expected.""" 1035 mock_queue = Mock() 1036 mock_queue.startswith.return_value = False 1037 mock_queue.endswith.return_value = False 1038 expected_default_options = { 1039 'passive': False, 1040 'durable': False, 1041 'exclusive': False, 1042 'auto-delete': True, 1043 'arguments': None, 1044 } 1045 mock_msg_count = Mock() 1046 mock_consumer_count = Mock() 1047 values_dict = { 1048 'msgDepth': mock_msg_count, 1049 'consumerCount': mock_consumer_count, 1050 } 1051 mock_queue_data = Mock() 1052 mock_queue_data.values = values_dict 1053 self.mock_broker.addQueue.return_value = None 1054 self.mock_broker.getQueue.return_value = mock_queue_data 1055 self.my_channel.queue_declare(mock_queue) 1056 mock_queue.startswith.assert_called_with('celeryev') 1057 mock_queue.endswith.assert_called_with('pidbox') 1058 self.mock_broker.addQueue.assert_called_with( 1059 mock_queue, options=expected_default_options, 1060 ) 1061 1062 def test_queue_declare_test_defaults(self): 1063 """Test declare_queue defaults.""" 1064 mock_queue = Mock() 1065 mock_queue.startswith.return_value = False 1066 mock_queue.endswith.return_value = False 1067 expected_default_options = { 1068 'passive': False, 1069 'durable': False, 1070 'exclusive': False, 1071 'auto-delete': True, 1072 'arguments': None, 1073 } 1074 mock_msg_count = Mock() 1075 mock_consumer_count = Mock() 1076 values_dict = { 1077 'msgDepth': mock_msg_count, 1078 'consumerCount': mock_consumer_count, 1079 } 1080 mock_queue_data = Mock() 1081 mock_queue_data.values = values_dict 1082 self.mock_broker.addQueue.return_value = None 1083 self.mock_broker.getQueue.return_value = mock_queue_data 1084 self.my_channel.queue_declare(mock_queue) 1085 self.mock_broker.addQueue.assert_called_with( 1086 mock_queue, 1087 options=expected_default_options, 1088 ) 1089 1090 def test_queue_declare_raises_exception_not_silenced(self): 1091 unique_exception = Exception('This exception should not be silenced') 1092 mock_queue = Mock() 1093 self.mock_broker.addQueue.side_effect = unique_exception 1094 with pytest.raises(unique_exception.__class__): 1095 self.my_channel.queue_declare(mock_queue) 1096 self.mock_broker.addQueue.assert_called_once_with( 1097 mock_queue, 1098 options={ 1099 'exclusive': False, 1100 'durable': False, 1101 'qpid.policy_type': 'ring', 1102 'passive': False, 1103 'arguments': None, 1104 'auto-delete': True 1105 }) 1106 1107 def test_exchange_declare_raises_exception_and_silenced(self): 1108 """Create exchange where an exception is raised and then silenced""" 1109 self.mock_broker.addExchange.side_effect = Exception( 1110 'The foo object already exists.', 1111 ) 1112 self.my_channel.exchange_declare() 1113 1114 def test_exchange_declare_raises_exception_not_silenced(self): 1115 """Create Exchange where an exception is raised and not silenced.""" 1116 unique_exception = Exception('This exception should not be silenced') 1117 self.mock_broker.addExchange.side_effect = unique_exception 1118 with pytest.raises(unique_exception.__class__): 1119 self.my_channel.exchange_declare() 1120 1121 def test_exchange_declare(self): 1122 """Create Exchange where an exception is NOT raised.""" 1123 mock_exchange = Mock() 1124 mock_type = Mock() 1125 mock_durable = Mock() 1126 options = {'durable': mock_durable} 1127 result = self.my_channel.exchange_declare( 1128 mock_exchange, mock_type, mock_durable, 1129 ) 1130 self.mock_broker.addExchange.assert_called_with( 1131 mock_type, mock_exchange, options, 1132 ) 1133 assert result is None 1134 1135 def test_exchange_delete(self): 1136 """Test the deletion of an exchange by name.""" 1137 mock_exchange = Mock() 1138 result = self.my_channel.exchange_delete(mock_exchange) 1139 self.mock_broker.delExchange.assert_called_with(mock_exchange) 1140 assert result is None 1141 1142 def test_queue_bind(self): 1143 """Test binding a queue to an exchange using a routing key.""" 1144 mock_queue = Mock() 1145 mock_exchange = Mock() 1146 mock_routing_key = Mock() 1147 self.my_channel.queue_bind( 1148 mock_queue, mock_exchange, mock_routing_key, 1149 ) 1150 self.mock_broker.bind.assert_called_with( 1151 mock_exchange, mock_queue, mock_routing_key, 1152 ) 1153 1154 def test_queue_unbind(self): 1155 """Test unbinding a queue from an exchange using a routing key.""" 1156 mock_queue = Mock() 1157 mock_exchange = Mock() 1158 mock_routing_key = Mock() 1159 self.my_channel.queue_unbind( 1160 mock_queue, mock_exchange, mock_routing_key, 1161 ) 1162 self.mock_broker.unbind.assert_called_with( 1163 mock_exchange, mock_queue, mock_routing_key, 1164 ) 1165 1166 def test_queue_purge(self): 1167 """Test purging a queue by name.""" 1168 mock_queue = Mock() 1169 purge_result = Mock() 1170 self.my_channel._purge = Mock(return_value=purge_result) 1171 result = self.my_channel.queue_purge(mock_queue) 1172 self.my_channel._purge.assert_called_with(mock_queue) 1173 assert purge_result is result 1174 1175 @patch(QPID_MODULE + '.Channel.qos') 1176 def test_basic_ack(self, mock_qos): 1177 """Test that basic_ack calls the QoS object properly.""" 1178 mock_delivery_tag = Mock() 1179 self.my_channel.basic_ack(mock_delivery_tag) 1180 mock_qos.ack.assert_called_with(mock_delivery_tag) 1181 1182 @patch(QPID_MODULE + '.Channel.qos') 1183 def test_basic_reject(self, mock_qos): 1184 """Test that basic_reject calls the QoS object properly.""" 1185 mock_delivery_tag = Mock() 1186 mock_requeue_value = Mock() 1187 self.my_channel.basic_reject(mock_delivery_tag, mock_requeue_value) 1188 mock_qos.reject.assert_called_with( 1189 mock_delivery_tag, requeue=mock_requeue_value, 1190 ) 1191 1192 def test_qos_manager_is_none(self): 1193 """Test the qos property if the QoS object did not already exist.""" 1194 self.my_channel._qos = None 1195 result = self.my_channel.qos 1196 assert isinstance(result, QoS) 1197 assert result == self.my_channel._qos 1198 1199 def test_qos_manager_already_exists(self): 1200 """Test the qos property if the QoS object already exists.""" 1201 mock_existing_qos = Mock() 1202 self.my_channel._qos = mock_existing_qos 1203 result = self.my_channel.qos 1204 assert mock_existing_qos is result 1205 1206 def test_prepare_message(self): 1207 """Test that prepare_message() returns the correct result.""" 1208 mock_body = Mock() 1209 mock_priority = Mock() 1210 mock_content_encoding = Mock() 1211 mock_content_type = Mock() 1212 mock_header1 = Mock() 1213 mock_header2 = Mock() 1214 mock_properties1 = Mock() 1215 mock_properties2 = Mock() 1216 headers = {'header1': mock_header1, 'header2': mock_header2} 1217 properties = {'properties1': mock_properties1, 1218 'properties2': mock_properties2} 1219 result = self.my_channel.prepare_message( 1220 mock_body, 1221 priority=mock_priority, 1222 content_type=mock_content_type, 1223 content_encoding=mock_content_encoding, 1224 headers=headers, 1225 properties=properties) 1226 assert mock_body is result['body'] 1227 assert mock_content_encoding is result['content-encoding'] 1228 assert mock_content_type is result['content-type'] 1229 assert headers == result['headers'] 1230 assert properties == result['properties'] 1231 assert (mock_priority is 1232 result['properties']['delivery_info']['priority']) 1233 1234 @patch('__builtin__.buffer') 1235 @patch(QPID_MODULE + '.Channel.body_encoding') 1236 @patch(QPID_MODULE + '.Channel.encode_body') 1237 @patch(QPID_MODULE + '.Channel._put') 1238 def test_basic_publish(self, mock_put, 1239 mock_encode_body, 1240 mock_body_encoding, 1241 mock_buffer): 1242 """Test basic_publish().""" 1243 mock_original_body = Mock() 1244 mock_encoded_body = 'this is my encoded body' 1245 mock_message = {'body': mock_original_body, 1246 'properties': {'delivery_info': {}}} 1247 mock_encode_body.return_value = ( 1248 mock_encoded_body, mock_body_encoding, 1249 ) 1250 mock_exchange = Mock() 1251 mock_routing_key = Mock() 1252 mock_encoded_buffered_body = Mock() 1253 mock_buffer.return_value = mock_encoded_buffered_body 1254 self.my_channel.basic_publish( 1255 mock_message, mock_exchange, mock_routing_key, 1256 ) 1257 mock_encode_body.assert_called_once_with( 1258 mock_original_body, mock_body_encoding, 1259 ) 1260 mock_buffer.assert_called_once_with(mock_encoded_body) 1261 assert mock_message['body'] is mock_encoded_buffered_body 1262 assert (mock_message['properties']['body_encoding'] is 1263 mock_body_encoding) 1264 assert isinstance( 1265 mock_message['properties']['delivery_tag'], uuid.UUID) 1266 assert (mock_message['properties']['delivery_info']['exchange'] is 1267 mock_exchange) 1268 assert (mock_message['properties']['delivery_info']['routing_key'] is 1269 mock_routing_key) 1270 mock_put.assert_called_with( 1271 mock_routing_key, mock_message, mock_exchange, 1272 ) 1273 1274 @patch(QPID_MODULE + '.Channel.codecs') 1275 def test_encode_body_expected_encoding(self, mock_codecs): 1276 """Test if encode_body() works when encoding is set correctly""" 1277 mock_body = Mock() 1278 mock_encoder = Mock() 1279 mock_encoded_result = Mock() 1280 mock_codecs.get.return_value = mock_encoder 1281 mock_encoder.encode.return_value = mock_encoded_result 1282 result = self.my_channel.encode_body(mock_body, encoding='base64') 1283 expected_result = (mock_encoded_result, 'base64') 1284 assert expected_result == result 1285 1286 @patch(QPID_MODULE + '.Channel.codecs') 1287 def test_encode_body_not_expected_encoding(self, mock_codecs): 1288 """Test if encode_body() works when encoding is not set correctly.""" 1289 mock_body = Mock() 1290 result = self.my_channel.encode_body(mock_body, encoding=None) 1291 expected_result = mock_body, None 1292 assert expected_result == result 1293 1294 @patch(QPID_MODULE + '.Channel.codecs') 1295 def test_decode_body_expected_encoding(self, mock_codecs): 1296 """Test if decode_body() works when encoding is set correctly.""" 1297 mock_body = Mock() 1298 mock_decoder = Mock() 1299 mock_decoded_result = Mock() 1300 mock_codecs.get.return_value = mock_decoder 1301 mock_decoder.decode.return_value = mock_decoded_result 1302 result = self.my_channel.decode_body(mock_body, encoding='base64') 1303 assert mock_decoded_result == result 1304 1305 @patch(QPID_MODULE + '.Channel.codecs') 1306 def test_decode_body_not_expected_encoding(self, mock_codecs): 1307 """Test if decode_body() works when encoding is not set correctly.""" 1308 mock_body = Mock() 1309 result = self.my_channel.decode_body(mock_body, encoding=None) 1310 assert mock_body == result 1311 1312 def test_typeof_exchange_exists(self): 1313 """Test that typeof() finds an exchange that already exists.""" 1314 mock_exchange = Mock() 1315 mock_qpid_exchange = Mock() 1316 mock_attributes = {} 1317 mock_type = Mock() 1318 mock_attributes['type'] = mock_type 1319 mock_qpid_exchange.getAttributes.return_value = mock_attributes 1320 self.mock_broker.getExchange.return_value = mock_qpid_exchange 1321 result = self.my_channel.typeof(mock_exchange) 1322 assert mock_type is result 1323 1324 def test_typeof_exchange_does_not_exist(self): 1325 """Test that typeof() finds an exchange that does not exists.""" 1326 mock_exchange = Mock() 1327 mock_default = Mock() 1328 self.mock_broker.getExchange.return_value = None 1329 result = self.my_channel.typeof(mock_exchange, default=mock_default) 1330 assert mock_default is result 1331 1332 1333@skip.if_python3() 1334@skip.if_pypy() 1335@pytest.mark.usefixtures('disable_runtime_dependency_check') 1336class test_Transport__init__(object): 1337 1338 @pytest.fixture(autouse=True) 1339 def mock_verify_runtime_environment(self, patching): 1340 self.mock_verify_runtime_environment = patching.object( 1341 Transport, 'verify_runtime_environment') 1342 1343 @pytest.fixture(autouse=True) 1344 def mock_transport_init(self, patching): 1345 self.mock_base_Transport__init__ = patching( 1346 QPID_MODULE + '.base.Transport.__init__') 1347 1348 def test_Transport___init___calls_verify_runtime_environment(self): 1349 Transport(Mock()) 1350 self.mock_verify_runtime_environment.assert_called_once_with() 1351 1352 def test_transport___init___calls_parent_class___init__(self): 1353 m = Mock() 1354 Transport(m) 1355 self.mock_base_Transport__init__.assert_called_once_with(m) 1356 1357 def test_transport___init___sets_use_async_interface_False(self): 1358 transport = Transport(Mock()) 1359 assert not transport.use_async_interface 1360 1361 1362@skip.if_python3() 1363@skip.if_pypy() 1364@pytest.mark.usefixtures('disable_runtime_dependency_check') 1365class test_Transport_drain_events(object): 1366 1367 @pytest.fixture(autouse=True) 1368 def setup_self(self, disable_runtime_dependency_check): 1369 # ^^ disable_runtime.. must be called before this fixture. 1370 self.transport = Transport(Mock()) 1371 self.transport.session = Mock() 1372 self.mock_queue = Mock() 1373 self.mock_message = Mock() 1374 self.mock_conn = Mock() 1375 self.mock_callback = Mock() 1376 self.mock_conn._callbacks = {self.mock_queue: self.mock_callback} 1377 1378 def mock_next_receiver(self, timeout): 1379 time.sleep(0.3) 1380 mock_receiver = Mock() 1381 mock_receiver.source = self.mock_queue 1382 mock_receiver.fetch.return_value = self.mock_message 1383 return mock_receiver 1384 1385 def test_socket_timeout_raised_when_all_receivers_empty(self): 1386 with patch(QPID_MODULE + '.QpidEmpty', new=QpidException): 1387 self.transport.session.next_receiver.side_effect = QpidException() 1388 with pytest.raises(socket.timeout): 1389 self.transport.drain_events(Mock()) 1390 1391 def test_socket_timeout_raised_when_by_timeout(self): 1392 self.transport.session.next_receiver = self.mock_next_receiver 1393 with pytest.raises(socket.timeout): 1394 self.transport.drain_events(self.mock_conn, timeout=1) 1395 1396 def test_timeout_returns_no_earlier_then_asked_for(self): 1397 self.transport.session.next_receiver = self.mock_next_receiver 1398 start_time = monotonic() 1399 try: 1400 self.transport.drain_events(self.mock_conn, timeout=1) 1401 except socket.timeout: 1402 pass 1403 elapsed_time_in_s = monotonic() - start_time 1404 assert elapsed_time_in_s >= 1.0 1405 1406 def test_callback_is_called(self): 1407 self.transport.session.next_receiver = self.mock_next_receiver 1408 try: 1409 self.transport.drain_events(self.mock_conn, timeout=1) 1410 except socket.timeout: 1411 pass 1412 self.mock_callback.assert_called_with(self.mock_message) 1413 1414 1415@skip.if_python3() 1416@skip.if_pypy() 1417class test_Transport_create_channel(object): 1418 1419 @pytest.fixture(autouse=True) 1420 def setup_self(self, disable_runtime_dependency_check): 1421 # ^^ disable runtime MUST be called before this fixture 1422 self.transport = Transport(Mock()) 1423 self.mock_conn = Mock() 1424 self.mock_new_channel = Mock() 1425 self.mock_conn.Channel.return_value = self.mock_new_channel 1426 self.returned_channel = self.transport.create_channel(self.mock_conn) 1427 1428 def test_new_channel_created_from_connection(self): 1429 assert self.mock_new_channel is self.returned_channel 1430 self.mock_conn.Channel.assert_called_with( 1431 self.mock_conn, self.transport, 1432 ) 1433 1434 def test_new_channel_added_to_connection_channel_list(self): 1435 append_method = self.mock_conn.channels.append 1436 append_method.assert_called_with(self.mock_new_channel) 1437 1438 1439@skip.if_python3() 1440@skip.if_pypy() 1441@pytest.mark.usefixtures('disable_runtime_dependency_check') 1442class test_Transport_establish_connection(object): 1443 1444 @pytest.fixture(autouse=True) 1445 def setup_self(self, disable_runtime_dependency_check): 1446 1447 class MockClient(object): 1448 pass 1449 1450 self.client = MockClient() 1451 self.client.connect_timeout = 4 1452 self.client.ssl = False 1453 self.client.transport_options = {} 1454 self.client.userid = None 1455 self.client.password = None 1456 self.client.login_method = None 1457 self.transport = Transport(self.client) 1458 self.mock_conn = Mock() 1459 self.transport.Connection = self.mock_conn 1460 1461 def test_transport_establish_conn_new_option_overwrites_default(self): 1462 self.client.userid = 'new-userid' 1463 self.client.password = 'new-password' 1464 self.transport.establish_connection() 1465 self.mock_conn.assert_called_once_with( 1466 username=self.client.userid, 1467 password=self.client.password, 1468 sasl_mechanisms='PLAIN', 1469 host='localhost', 1470 timeout=4, 1471 port=5672, 1472 transport='tcp', 1473 ) 1474 1475 def test_transport_establish_conn_empty_client_is_default(self): 1476 self.transport.establish_connection() 1477 self.mock_conn.assert_called_once_with( 1478 sasl_mechanisms='ANONYMOUS', 1479 host='localhost', 1480 timeout=4, 1481 port=5672, 1482 transport='tcp', 1483 ) 1484 1485 def test_transport_establish_conn_additional_transport_option(self): 1486 new_param_value = 'mynewparam' 1487 self.client.transport_options['new_param'] = new_param_value 1488 self.transport.establish_connection() 1489 self.mock_conn.assert_called_once_with( 1490 sasl_mechanisms='ANONYMOUS', 1491 host='localhost', 1492 timeout=4, 1493 new_param=new_param_value, 1494 port=5672, 1495 transport='tcp', 1496 ) 1497 1498 def test_transport_establish_conn_transform_localhost_to_127_0_0_1(self): 1499 self.client.hostname = 'localhost' 1500 self.transport.establish_connection() 1501 self.mock_conn.assert_called_once_with( 1502 sasl_mechanisms='ANONYMOUS', 1503 host='localhost', 1504 timeout=4, 1505 port=5672, 1506 transport='tcp', 1507 ) 1508 1509 def test_transport_password_no_userid_raises_exception(self): 1510 self.client.password = 'somepass' 1511 with pytest.raises(Exception): 1512 self.transport.establish_connection() 1513 1514 def test_transport_userid_no_password_raises_exception(self): 1515 self.client.userid = 'someusername' 1516 with pytest.raises(Exception): 1517 self.transport.establish_connection() 1518 1519 def test_transport_overrides_sasl_mech_from_login_method(self): 1520 self.client.login_method = 'EXTERNAL' 1521 self.transport.establish_connection() 1522 self.mock_conn.assert_called_once_with( 1523 sasl_mechanisms='EXTERNAL', 1524 host='localhost', 1525 timeout=4, 1526 port=5672, 1527 transport='tcp', 1528 ) 1529 1530 def test_transport_overrides_sasl_mech_has_username(self): 1531 self.client.userid = 'new-userid' 1532 self.client.login_method = 'EXTERNAL' 1533 self.transport.establish_connection() 1534 self.mock_conn.assert_called_once_with( 1535 username=self.client.userid, 1536 sasl_mechanisms='EXTERNAL', 1537 host='localhost', 1538 timeout=4, 1539 port=5672, 1540 transport='tcp', 1541 ) 1542 1543 def test_transport_establish_conn_set_password(self): 1544 self.client.userid = 'someuser' 1545 self.client.password = 'somepass' 1546 self.transport.establish_connection() 1547 self.mock_conn.assert_called_once_with( 1548 username='someuser', 1549 password='somepass', 1550 sasl_mechanisms='PLAIN', 1551 host='localhost', 1552 timeout=4, 1553 port=5672, 1554 transport='tcp', 1555 ) 1556 1557 def test_transport_establish_conn_no_ssl_sets_transport_tcp(self): 1558 self.client.ssl = False 1559 self.transport.establish_connection() 1560 self.mock_conn.assert_called_once_with( 1561 sasl_mechanisms='ANONYMOUS', 1562 host='localhost', 1563 timeout=4, 1564 port=5672, 1565 transport='tcp', 1566 ) 1567 1568 def test_transport_establish_conn_with_ssl_with_hostname_check(self): 1569 self.client.ssl = { 1570 'keyfile': 'my_keyfile', 1571 'certfile': 'my_certfile', 1572 'ca_certs': 'my_cacerts', 1573 'cert_reqs': ssl.CERT_REQUIRED, 1574 } 1575 self.transport.establish_connection() 1576 self.mock_conn.assert_called_once_with( 1577 ssl_certfile='my_certfile', 1578 ssl_trustfile='my_cacerts', 1579 timeout=4, 1580 ssl_skip_hostname_check=False, 1581 sasl_mechanisms='ANONYMOUS', 1582 host='localhost', 1583 ssl_keyfile='my_keyfile', 1584 port=5672, transport='ssl', 1585 ) 1586 1587 def test_transport_establish_conn_with_ssl_skip_hostname_check(self): 1588 self.client.ssl = { 1589 'keyfile': 'my_keyfile', 1590 'certfile': 'my_certfile', 1591 'ca_certs': 'my_cacerts', 1592 'cert_reqs': ssl.CERT_OPTIONAL, 1593 } 1594 self.transport.establish_connection() 1595 self.mock_conn.assert_called_once_with( 1596 ssl_certfile='my_certfile', 1597 ssl_trustfile='my_cacerts', 1598 timeout=4, 1599 ssl_skip_hostname_check=True, 1600 sasl_mechanisms='ANONYMOUS', 1601 host='localhost', 1602 ssl_keyfile='my_keyfile', 1603 port=5672, transport='ssl', 1604 ) 1605 1606 def test_transport_establish_conn_sets_client_on_connection_object(self): 1607 self.transport.establish_connection() 1608 assert self.mock_conn.return_value.client is self.client 1609 1610 def test_transport_establish_conn_creates_session_on_transport(self): 1611 self.transport.establish_connection() 1612 qpid_conn = self.mock_conn.return_value.get_qpid_connection 1613 new_mock_session = qpid_conn.return_value.session.return_value 1614 assert self.transport.session is new_mock_session 1615 1616 def test_transport_establish_conn_returns_new_connection_object(self): 1617 new_conn = self.transport.establish_connection() 1618 assert new_conn is self.mock_conn.return_value 1619 1620 def test_transport_establish_conn_uses_hostname_if_not_default(self): 1621 self.client.hostname = 'some_other_hostname' 1622 self.transport.establish_connection() 1623 self.mock_conn.assert_called_once_with( 1624 sasl_mechanisms='ANONYMOUS', 1625 host='some_other_hostname', 1626 timeout=4, 1627 port=5672, 1628 transport='tcp', 1629 ) 1630 1631 def test_transport_sets_qpid_message_ready_handler(self): 1632 self.transport.establish_connection() 1633 qpid_conn_call = self.mock_conn.return_value.get_qpid_connection 1634 mock_session = qpid_conn_call.return_value.session.return_value 1635 mock_set_callback = mock_session.set_message_received_notify_handler 1636 expected_msg_callback = self.transport._qpid_message_ready_handler 1637 mock_set_callback.assert_called_once_with(expected_msg_callback) 1638 1639 def test_transport_sets_session_exception_handler(self): 1640 self.transport.establish_connection() 1641 qpid_conn_call = self.mock_conn.return_value.get_qpid_connection 1642 mock_session = qpid_conn_call.return_value.session.return_value 1643 mock_set_callback = mock_session.set_async_exception_notify_handler 1644 exc_callback = self.transport._qpid_async_exception_notify_handler 1645 mock_set_callback.assert_called_once_with(exc_callback) 1646 1647 def test_transport_sets_connection_exception_handler(self): 1648 self.transport.establish_connection() 1649 qpid_conn_call = self.mock_conn.return_value.get_qpid_connection 1650 qpid_conn = qpid_conn_call.return_value 1651 mock_set_callback = qpid_conn.set_async_exception_notify_handler 1652 exc_callback = self.transport._qpid_async_exception_notify_handler 1653 mock_set_callback.assert_called_once_with(exc_callback) 1654 1655 1656@skip.if_python3() 1657@skip.if_pypy() 1658class test_Transport_class_attributes(object): 1659 1660 def test_verify_Connection_attribute(self): 1661 assert Connection is Transport.Connection 1662 1663 def test_verify_polling_disabled(self): 1664 assert Transport.polling_interval is None 1665 1666 def test_verify_driver_type_and_name(self): 1667 assert Transport.driver_type == 'qpid' 1668 assert Transport.driver_name == 'qpid' 1669 1670 def test_verify_implements_exchange_types(self): 1671 assert 'fanout' in Transport.implements.exchange_type 1672 assert 'direct' in Transport.implements.exchange_type 1673 assert 'topic' in Transport.implements.exchange_type 1674 assert 'frobnitz' not in Transport.implements.exchange_type 1675 1676 def test_transport_verify_recoverable_connection_errors(self): 1677 connection_errors = Transport.recoverable_connection_errors 1678 assert ConnectionError in connection_errors 1679 assert select.error in connection_errors 1680 1681 def test_transport_verify_recoverable_channel_errors(self): 1682 channel_errors = Transport.recoverable_channel_errors 1683 assert NotFound in channel_errors 1684 1685 def test_transport_verify_pre_kombu_3_0_exception_labels(self): 1686 assert (Transport.recoverable_channel_errors == 1687 Transport.channel_errors) 1688 assert (Transport.recoverable_connection_errors == 1689 Transport.connection_errors) 1690 1691 1692@skip.if_python3() 1693@skip.if_pypy() 1694@pytest.mark.usefixtures('disable_runtime_dependency_check') 1695class test_Transport_register_with_event_loop(object): 1696 1697 def test_transport_register_with_event_loop_calls_add_reader(self): 1698 transport = Transport(Mock()) 1699 mock_connection = Mock() 1700 mock_loop = Mock() 1701 transport.register_with_event_loop(mock_connection, mock_loop) 1702 mock_loop.add_reader.assert_called_with( 1703 transport.r, transport.on_readable, mock_connection, mock_loop, 1704 ) 1705 1706 1707@skip.if_python3() 1708@skip.if_pypy() 1709@pytest.mark.usefixtures('disable_runtime_dependency_check') 1710class test_Transport_Qpid_callback_handlers_async(object): 1711 1712 @pytest.fixture(autouse=True) 1713 def setup_self(self, patching, disable_runtime_dependency_check): 1714 self.mock_os_write = patching(QPID_MODULE + '.os.write') 1715 self.transport = Transport(Mock()) 1716 self.transport.register_with_event_loop(Mock(), Mock()) 1717 1718 def test__qpid_message_ready_handler_writes_symbol_to_fd(self): 1719 self.transport._qpid_message_ready_handler(Mock()) 1720 self.mock_os_write.assert_called_once_with(self.transport._w, '0') 1721 1722 def test__qpid_async_exception_notify_handler_writes_symbol_to_fd(self): 1723 self.transport._qpid_async_exception_notify_handler(Mock(), Mock()) 1724 self.mock_os_write.assert_called_once_with(self.transport._w, 'e') 1725 1726 1727@skip.if_python3() 1728@skip.if_pypy() 1729@pytest.mark.usefixtures('disable_runtime_dependency_check') 1730class test_Transport_Qpid_callback_handlers_sync(object): 1731 1732 @pytest.fixture(autouse=True) 1733 def setup(self, patching, disable_runtime_dependency_check): 1734 self.mock_os_write = patching(QPID_MODULE + '.os.write') 1735 self.transport = Transport(Mock()) 1736 1737 def test__qpid_message_ready_handler_dows_not_write(self): 1738 self.transport._qpid_message_ready_handler(Mock()) 1739 self.mock_os_write.assert_not_called() 1740 1741 def test__qpid_async_exception_notify_handler_does_not_write(self): 1742 self.transport._qpid_async_exception_notify_handler(Mock(), Mock()) 1743 self.mock_os_write.assert_not_called() 1744 1745 1746@skip.if_python3() 1747@skip.if_pypy() 1748@pytest.mark.usefixtures('disable_runtime_dependency_check') 1749class test_Transport_on_readable(object): 1750 1751 @pytest.fixture(autouse=True) 1752 def setup_self(self, patching, disable_runtime_dependency_check): 1753 self.mock_os_read = patching(QPID_MODULE + '.os.read') 1754 1755 self.mock_drain_events = patching.object(Transport, 'drain_events') 1756 self.transport = Transport(Mock()) 1757 self.transport.register_with_event_loop(Mock(), Mock()) 1758 1759 def test_transport_on_readable_reads_symbol_from_fd(self): 1760 self.transport.on_readable(Mock(), Mock()) 1761 self.mock_os_read.assert_called_once_with(self.transport.r, 1) 1762 1763 def test_transport_on_readable_calls_drain_events(self): 1764 mock_connection = Mock() 1765 self.transport.on_readable(mock_connection, Mock()) 1766 self.mock_drain_events.assert_called_with(mock_connection) 1767 1768 def test_transport_on_readable_catches_socket_timeout(self): 1769 self.mock_drain_events.side_effect = socket.timeout() 1770 self.transport.on_readable(Mock(), Mock()) 1771 1772 def test_transport_on_readable_ignores_non_socket_timeout_exception(self): 1773 self.mock_drain_events.side_effect = IOError() 1774 with pytest.raises(IOError): 1775 self.transport.on_readable(Mock(), Mock()) 1776 1777 1778@skip.if_python3() 1779@skip.if_pypy() 1780@pytest.mark.usefixtures('disable_runtime_dependency_check') 1781class test_Transport_verify_runtime_environment(object): 1782 1783 @pytest.fixture(autouse=True) 1784 def setup_self(self, patching): 1785 self.verify_runtime_environment = Transport.verify_runtime_environment 1786 patching.object(Transport, 'verify_runtime_environment') 1787 self.transport = Transport(Mock()) 1788 1789 @patch(QPID_MODULE + '.PY3', new=True) 1790 def test_raises_exception_for_Python3(self): 1791 with pytest.raises(RuntimeError): 1792 self.verify_runtime_environment(self.transport) 1793 1794 @patch('__builtin__.getattr') 1795 def test_raises_exc_for_PyPy(self, mock_getattr): 1796 mock_getattr.return_value = True 1797 with pytest.raises(RuntimeError): 1798 self.verify_runtime_environment(self.transport) 1799 1800 @patch(QPID_MODULE + '.dependency_is_none') 1801 def test_raises_exc_dep_missing(self, mock_dep_is_none): 1802 mock_dep_is_none.return_value = True 1803 with pytest.raises(RuntimeError): 1804 self.verify_runtime_environment(self.transport) 1805 1806 @patch(QPID_MODULE + '.dependency_is_none') 1807 def test_calls_dependency_is_none(self, mock_dep_is_none): 1808 mock_dep_is_none.return_value = False 1809 self.verify_runtime_environment(self.transport) 1810 mock_dep_is_none.assert_called() 1811 1812 def test_raises_no_exception(self): 1813 self.verify_runtime_environment(self.transport) 1814 1815 1816@skip.if_python3() 1817@skip.if_pypy() 1818@pytest.mark.usefixtures('disable_runtime_dependency_check') 1819class test_Transport(object): 1820 1821 def setup(self): 1822 """Creates a mock_client to be used in testing.""" 1823 self.mock_client = Mock() 1824 1825 def test_supports_ev(self): 1826 """Test that the transport claims to support async event loop""" 1827 assert Transport(self.mock_client).supports_ev 1828 1829 def test_close_connection(self): 1830 """Test that close_connection calls close on the connection.""" 1831 my_transport = Transport(self.mock_client) 1832 mock_connection = Mock() 1833 my_transport.close_connection(mock_connection) 1834 mock_connection.close.assert_called_once_with() 1835 1836 def test_default_connection_params(self): 1837 """Test that the default_connection_params are correct""" 1838 correct_params = { 1839 'hostname': 'localhost', 1840 'port': 5672, 1841 } 1842 my_transport = Transport(self.mock_client) 1843 result_params = my_transport.default_connection_params 1844 assert correct_params == result_params 1845 1846 @patch(QPID_MODULE + '.os.close') 1847 def test_del_sync(self, close): 1848 my_transport = Transport(self.mock_client) 1849 my_transport.__del__() 1850 close.assert_not_called() 1851 1852 @patch(QPID_MODULE + '.os.close') 1853 def test_del_async(self, close): 1854 my_transport = Transport(self.mock_client) 1855 my_transport.register_with_event_loop(Mock(), Mock()) 1856 my_transport.__del__() 1857 close.assert_called() 1858 1859 @patch(QPID_MODULE + '.os.close') 1860 def test_del_async_failed(self, close): 1861 close.side_effect = OSError() 1862 my_transport = Transport(self.mock_client) 1863 my_transport.register_with_event_loop(Mock(), Mock()) 1864 my_transport.__del__() 1865 close.assert_called() 1866