1from __future__ import absolute_import, unicode_literals
2
3import pytest
4import select
5import ssl
6import socket
7import sys
8import time
9import uuid
10
11from collections import OrderedDict
12
13try:
14    from collections.abc import Callable
15except ImportError:
16    from collections import Callable
17
18from itertools import count
19
20from case import Mock, call, patch, skip
21
22from kombu.five import Empty, range, monotonic
23from kombu.transport.qpid import (AuthenticationFailure, Channel, Connection,
24                                  ConnectionError, Message, NotFound, QoS,
25                                  Transport)
26from kombu.transport.virtual import Base64
27
28
29QPID_MODULE = 'kombu.transport.qpid'
30
31
32@pytest.fixture
33def disable_runtime_dependency_check(patching):
34    mock_dependency_is_none = patching(QPID_MODULE + '.dependency_is_none')
35    mock_dependency_is_none.return_value = False
36    return mock_dependency_is_none
37
38
39class QpidException(Exception):
40    """
41    An object used to mock Exceptions provided by qpid.messaging.exceptions
42    """
43
44    def __init__(self, code=None, text=None):
45        super(Exception, self).__init__(self)
46        self.code = code
47        self.text = text
48
49
50class BreakOutException(Exception):
51    pass
52
53
54@skip.if_python3()
55@skip.if_pypy()
56class test_QoS__init__(object):
57
58    def setup(self):
59        self.mock_session = Mock()
60        self.qos = QoS(self.mock_session)
61
62    def test__init__prefetch_default_set_correct_without_prefetch_value(self):
63        assert self.qos.prefetch_count == 1
64
65    def test__init__prefetch_is_hard_set_to_one(self):
66        qos_limit_two = QoS(self.mock_session)
67        assert qos_limit_two.prefetch_count == 1
68
69    def test__init___not_yet_acked_is_initialized(self):
70        assert isinstance(self.qos._not_yet_acked, OrderedDict)
71
72
73@skip.if_python3()
74@skip.if_pypy()
75class test_QoS_can_consume(object):
76
77    def setup(self):
78        session = Mock()
79        self.qos = QoS(session)
80
81    def test_True_when_prefetch_limit_is_zero(self):
82        self.qos.prefetch_count = 0
83        self.qos._not_yet_acked = []
84        assert self.qos.can_consume()
85
86    def test_True_when_len_of__not_yet_acked_is_lt_prefetch_count(self):
87        self.qos.prefetch_count = 3
88        self.qos._not_yet_acked = ['a', 'b']
89        assert self.qos.can_consume()
90
91    def test_False_when_len_of__not_yet_acked_is_eq_prefetch_count(self):
92        self.qos.prefetch_count = 3
93        self.qos._not_yet_acked = ['a', 'b', 'c']
94        assert not self.qos.can_consume()
95
96
97@skip.if_python3()
98@skip.if_pypy()
99class test_QoS_can_consume_max_estimate(object):
100
101    def setup(self):
102        self.mock_session = Mock()
103        self.qos = QoS(self.mock_session)
104
105    def test_return_one_when_prefetch_count_eq_zero(self):
106        self.qos.prefetch_count = 0
107        assert self.qos.can_consume_max_estimate() == 1
108
109    def test_return_prefetch_count_sub_len__not_yet_acked(self):
110        self.qos._not_yet_acked = ['a', 'b']
111        self.qos.prefetch_count = 4
112        assert self.qos.can_consume_max_estimate() == 2
113
114
115@skip.if_python3()
116@skip.if_pypy()
117class test_QoS_ack(object):
118
119    def setup(self):
120        self.mock_session = Mock()
121        self.qos = QoS(self.mock_session)
122
123    def test_ack_pops__not_yet_acked(self):
124        message = Mock()
125        self.qos.append(message, 1)
126        assert 1 in self.qos._not_yet_acked
127        self.qos.ack(1)
128        assert 1 not in self.qos._not_yet_acked
129
130    def test_ack_calls_session_acknowledge_with_message(self):
131        message = Mock()
132        self.qos.append(message, 1)
133        self.qos.ack(1)
134        self.qos.session.acknowledge.assert_called_with(message=message)
135
136
137@skip.if_python3()
138@skip.if_pypy()
139class test_QoS_reject(object):
140
141    @pytest.fixture(autouse=True)
142    def setup_qpid(self, patching):
143        self.mock_qpid = patching(QPID_MODULE + '.qpid')
144        self.mock_Disposition = self.mock_qpid.messaging.Disposition
145        self.mock_RELEASED = self.mock_qpid.messaging.RELEASED
146        self.mock_REJECTED = self.mock_qpid.messaging.REJECTED
147
148    def setup(self):
149        self.mock_session = Mock()
150        self.mock_message = Mock()
151        self.qos = QoS(self.mock_session)
152
153    def test_reject_pops__not_yet_acked(self):
154        self.qos.append(self.mock_message, 1)
155        assert 1 in self.qos._not_yet_acked
156        self.qos.reject(1)
157        assert 1 not in self.qos._not_yet_acked
158
159    def test_reject_requeue_true(self):
160        self.qos.append(self.mock_message, 1)
161        self.qos.reject(1, requeue=True)
162        self.mock_Disposition.assert_called_with(self.mock_RELEASED)
163        self.qos.session.acknowledge.assert_called_with(
164            message=self.mock_message,
165            disposition=self.mock_Disposition.return_value,
166        )
167
168    def test_reject_requeue_false(self):
169        message = Mock()
170        self.qos.append(message, 1)
171        self.qos.reject(1, requeue=False)
172        self.mock_Disposition.assert_called_with(self.mock_REJECTED)
173        self.qos.session.acknowledge.assert_called_with(
174            message=message, disposition=self.mock_Disposition.return_value,
175        )
176
177
178@skip.if_python3()
179@skip.if_pypy()
180class test_QoS(object):
181
182    def mock_message_factory(self):
183        """Create and return a mock message tag and delivery_tag."""
184        m_delivery_tag = self.delivery_tag_generator.next()
185        m = 'message %s' % (m_delivery_tag, )
186        return m, m_delivery_tag
187
188    def add_n_messages_to_qos(self, n, qos):
189        """Add N mock messages into the passed in qos object"""
190        for i in range(n):
191            self.add_message_to_qos(qos)
192
193    def add_message_to_qos(self, qos):
194        """Add a single mock message into the passed in qos object.
195
196        Uses the mock_message_factory() to create the message and
197        delivery_tag.
198        """
199        m, m_delivery_tag = self.mock_message_factory()
200        qos.append(m, m_delivery_tag)
201
202    def setup(self):
203        self.mock_session = Mock()
204        self.qos_no_limit = QoS(self.mock_session)
205        self.qos_limit_2 = QoS(self.mock_session, prefetch_count=2)
206        self.delivery_tag_generator = count(1)
207
208    def test_append(self):
209        """Append two messages and check inside the QoS object that they
210        were put into the internal data structures correctly
211        """
212        qos = self.qos_no_limit
213        m1, m1_tag = self.mock_message_factory()
214        m2, m2_tag = self.mock_message_factory()
215        qos.append(m1, m1_tag)
216        length_not_yet_acked = len(qos._not_yet_acked)
217        assert length_not_yet_acked == 1
218        checked_message1 = qos._not_yet_acked[m1_tag]
219        assert m1 is checked_message1
220        qos.append(m2, m2_tag)
221        length_not_yet_acked = len(qos._not_yet_acked)
222        assert length_not_yet_acked == 2
223        checked_message2 = qos._not_yet_acked[m2_tag]
224        assert m2 is checked_message2
225
226    def test_get(self):
227        """Append two messages, and use get to receive them"""
228        qos = self.qos_no_limit
229        m1, m1_tag = self.mock_message_factory()
230        m2, m2_tag = self.mock_message_factory()
231        qos.append(m1, m1_tag)
232        qos.append(m2, m2_tag)
233        message1 = qos.get(m1_tag)
234        message2 = qos.get(m2_tag)
235        assert m1 is message1
236        assert m2 is message2
237
238
239@skip.if_python3()
240@skip.if_pypy()
241class ConnectionTestBase(object):
242
243    @patch(QPID_MODULE + '.qpid')
244    def setup(self, mock_qpid):
245        self.connection_options = {
246            'host': 'localhost',
247            'port': 5672,
248            'transport': 'tcp',
249            'timeout': 10,
250            'sasl_mechanisms': 'ANONYMOUS',
251        }
252        self.mock_qpid_connection = mock_qpid.messaging.Connection
253        self.conn = Connection(**self.connection_options)
254
255
256@skip.if_python3()
257@skip.if_pypy()
258class test_Connection__init__(ConnectionTestBase):
259
260    def test_stores_connection_options(self):
261        # ensure that only one mech was passed into connection. The other
262        # options should all be passed through as-is
263        modified_conn_opts = self.connection_options
264        assert modified_conn_opts == self.conn.connection_options
265
266    def test_class_variables(self):
267        assert isinstance(self.conn.channels, list)
268        assert isinstance(self.conn._callbacks, dict)
269
270    def test_establishes_connection(self):
271        modified_conn_opts = self.connection_options
272        self.mock_qpid_connection.establish.assert_called_with(
273            **modified_conn_opts
274        )
275
276    def test_saves_established_connection(self):
277        created_conn = self.mock_qpid_connection.establish.return_value
278        assert self.conn._qpid_conn is created_conn
279
280    @patch(QPID_MODULE + '.ConnectionError', new=(QpidException, ))
281    @patch(QPID_MODULE + '.sys.exc_info')
282    @patch(QPID_MODULE + '.qpid')
283    def test_mutates_ConnError_by_message(self, mock_qpid, mock_exc_info):
284        text = 'connection-forced: Authentication failed(320)'
285        my_conn_error = QpidException(text=text)
286        mock_qpid.messaging.Connection.establish.side_effect = my_conn_error
287        mock_exc_info.return_value = 'a', 'b', None
288        try:
289            self.conn = Connection(**self.connection_options)
290        except AuthenticationFailure as error:
291            exc_info = sys.exc_info()
292            assert not isinstance(error, QpidException)
293            assert exc_info[1] == 'b'
294            assert exc_info[2] is None
295        else:
296            self.fail('ConnectionError type was not mutated correctly')
297
298    @patch(QPID_MODULE + '.ConnectionError', new=(QpidException, ))
299    @patch(QPID_MODULE + '.sys.exc_info')
300    @patch(QPID_MODULE + '.qpid')
301    def test_mutates_ConnError_by_code(self, mock_qpid, mock_exc_info):
302        my_conn_error = QpidException(code=320, text='someothertext')
303        mock_qpid.messaging.Connection.establish.side_effect = my_conn_error
304        mock_exc_info.return_value = 'a', 'b', None
305        try:
306            self.conn = Connection(**self.connection_options)
307        except AuthenticationFailure as error:
308            exc_info = sys.exc_info()
309            assert not isinstance(error, QpidException)
310            assert exc_info[1] == 'b'
311            assert exc_info[2] is None
312        else:
313            self.fail('ConnectionError type was not mutated correctly')
314
315    @patch(QPID_MODULE + '.ConnectionError', new=(QpidException, ))
316    @patch(QPID_MODULE + '.sys.exc_info')
317    @patch(QPID_MODULE + '.qpid')
318    def test_connection__init__mutates_ConnError_by_message2(self, mock_qpid,
319                                                             mock_exc_info):
320        """
321        Test for PLAIN connection via python-saslwrapper, sans cyrus-sasl-plain
322
323        This test is specific for what is returned when we attempt to connect
324        with PLAIN mech and python-saslwrapper is installed, but
325        cyrus-sasl-plain is not installed.
326        """
327        my_conn_error = QpidException()
328        my_conn_error.text = 'Error in sasl_client_start (-4) SASL(-4): no '\
329                             'mechanism available'
330        mock_qpid.messaging.Connection.establish.side_effect = my_conn_error
331        mock_exc_info.return_value = ('a', 'b', None)
332        try:
333            self.conn = Connection(**self.connection_options)
334        except AuthenticationFailure as error:
335            exc_info = sys.exc_info()
336            assert not isinstance(error, QpidException)
337            assert exc_info[1] == 'b'
338            assert exc_info[2] is None
339        else:
340            self.fail('ConnectionError type was not mutated correctly')
341
342    @patch(QPID_MODULE + '.ConnectionError', new=(QpidException, ))
343    @patch(QPID_MODULE + '.sys.exc_info')
344    @patch(QPID_MODULE + '.qpid')
345    def test_unknown_connection_error(self, mock_qpid, mock_exc_info):
346        # If we get a connection error that we don't understand,
347        # bubble it up as-is
348        my_conn_error = QpidException(code=999, text='someothertext')
349        mock_qpid.messaging.Connection.establish.side_effect = my_conn_error
350        mock_exc_info.return_value = 'a', 'b', None
351        try:
352            self.conn = Connection(**self.connection_options)
353        except Exception as error:
354            assert error.code == 999
355        else:
356            self.fail('Connection should have thrown an exception')
357
358    @patch.object(Transport, 'channel_errors', new=(QpidException, ))
359    @patch(QPID_MODULE + '.qpid')
360    @patch(QPID_MODULE + '.ConnectionError', new=IOError)
361    def test_non_qpid_error_raises(self, mock_qpid):
362        mock_Qpid_Connection = mock_qpid.messaging.Connection
363        my_conn_error = SyntaxError()
364        my_conn_error.text = 'some non auth related error message'
365        mock_Qpid_Connection.establish.side_effect = my_conn_error
366        with pytest.raises(SyntaxError):
367            Connection(**self.connection_options)
368
369    @patch(QPID_MODULE + '.qpid')
370    @patch(QPID_MODULE + '.ConnectionError', new=IOError)
371    def test_non_auth_conn_error_raises(self, mock_qpid):
372        mock_Qpid_Connection = mock_qpid.messaging.Connection
373        my_conn_error = IOError()
374        my_conn_error.text = 'some non auth related error message'
375        mock_Qpid_Connection.establish.side_effect = my_conn_error
376        with pytest.raises(IOError):
377            Connection(**self.connection_options)
378
379
380@skip.if_python3()
381@skip.if_pypy()
382class test_Connection_class_attributes(ConnectionTestBase):
383
384    def test_connection_verify_class_attributes(self):
385        assert Channel == Connection.Channel
386
387
388@skip.if_python3()
389@skip.if_pypy()
390class test_Connection_get_Qpid_connection(ConnectionTestBase):
391
392    def test_connection_get_qpid_connection(self):
393        self.conn._qpid_conn = Mock()
394        returned_connection = self.conn.get_qpid_connection()
395        assert self.conn._qpid_conn is returned_connection
396
397
398@skip.if_python3()
399@skip.if_pypy()
400class test_Connection_close(ConnectionTestBase):
401
402    def test_connection_close(self):
403        self.conn._qpid_conn = Mock()
404        self.conn.close()
405        self.conn._qpid_conn.close.assert_called_once_with()
406
407
408@skip.if_python3()
409@skip.if_pypy()
410class test_Connection_close_channel(ConnectionTestBase):
411
412    def setup(self):
413        super(test_Connection_close_channel, self).setup()
414        self.conn.channels = Mock()
415
416    def test_connection_close_channel_removes_channel_from_channel_list(self):
417        mock_channel = Mock()
418        self.conn.close_channel(mock_channel)
419        self.conn.channels.remove.assert_called_once_with(mock_channel)
420
421    def test_connection_close_channel_handles_ValueError_being_raised(self):
422        self.conn.channels.remove = Mock(side_effect=ValueError())
423        self.conn.close_channel(Mock())
424
425    def test_connection_close_channel_set_channel_connection_to_None(self):
426        mock_channel = Mock()
427        mock_channel.connection = False
428        self.conn.channels.remove = Mock(side_effect=ValueError())
429        self.conn.close_channel(mock_channel)
430        assert mock_channel.connection is None
431
432
433@skip.if_python3()
434@skip.if_pypy()
435class ChannelTestBase(object):
436
437    @pytest.fixture(autouse=True)
438    def setup_channel(self, patching):
439        self.mock_qpidtoollibs = patching(QPID_MODULE + '.qpidtoollibs')
440        self.mock_broker_agent = self.mock_qpidtoollibs.BrokerAgent
441        self.conn = Mock()
442        self.transport = Mock()
443        self.channel = Channel(self.conn, self.transport)
444
445
446@skip.if_python3()
447@skip.if_pypy()
448class test_Channel_purge(ChannelTestBase):
449
450    def setup(self):
451        self.mock_queue = Mock()
452
453    def test_gets_queue(self):
454        self.channel._purge(self.mock_queue)
455        getQueue = self.mock_broker_agent.return_value.getQueue
456        getQueue.assert_called_once_with(self.mock_queue)
457
458    def test_does_not_call_purge_if_message_count_is_zero(self):
459        values = {'msgDepth': 0}
460        queue_obj = self.mock_broker_agent.return_value.getQueue.return_value
461        queue_obj.values = values
462        self.channel._purge(self.mock_queue)
463        assert not queue_obj.purge.called
464
465    def test_purges_all_messages_from_queue(self):
466        values = {'msgDepth': 5}
467        queue_obj = self.mock_broker_agent.return_value.getQueue.return_value
468        queue_obj.values = values
469        self.channel._purge(self.mock_queue)
470        queue_obj.purge.assert_called_with(5)
471
472    def test_returns_message_count(self):
473        values = {'msgDepth': 5}
474        queue_obj = self.mock_broker_agent.return_value.getQueue.return_value
475        queue_obj.values = values
476        result = self.channel._purge(self.mock_queue)
477        assert result == 5
478
479    @patch(QPID_MODULE + '.NotFound', new=QpidException)
480    def test_raises_channel_error_if_queue_does_not_exist(self):
481        self.mock_broker_agent.return_value.getQueue.return_value = None
482        with pytest.raises(QpidException):
483            self.channel._purge(self.mock_queue)
484
485
486@skip.if_python3()
487@skip.if_pypy()
488class test_Channel_put(ChannelTestBase):
489
490    @patch(QPID_MODULE + '.qpid')
491    def test_channel__put_onto_queue(self, mock_qpid):
492        routing_key = 'routingkey'
493        mock_message = Mock()
494        mock_Message_cls = mock_qpid.messaging.Message
495
496        self.channel._put(routing_key, mock_message)
497
498        address_str = '{0}; {{assert: always, node: {{type: queue}}}}'.format(
499            routing_key,
500        )
501        self.transport.session.sender.assert_called_with(address_str)
502        mock_Message_cls.assert_called_with(
503            content=mock_message, subject=None, durable=True
504        )
505        mock_sender = self.transport.session.sender.return_value
506        mock_sender.send.assert_called_with(
507            mock_Message_cls.return_value, sync=True,
508        )
509        mock_sender.close.assert_called_with()
510
511    @patch(QPID_MODULE + '.qpid')
512    def test_channel__put_onto_exchange(self, mock_qpid):
513        mock_routing_key = 'routingkey'
514        mock_exchange_name = 'myexchange'
515        mock_message = Mock()
516        mock_Message_cls = mock_qpid.messaging.Message
517
518        self.channel._put(mock_routing_key, mock_message, mock_exchange_name)
519
520        addrstr = '{0}/{1}; {{assert: always, node: {{type: topic}}}}'.format(
521            mock_exchange_name, mock_routing_key,
522        )
523        self.transport.session.sender.assert_called_with(addrstr)
524        mock_Message_cls.assert_called_with(
525            content=mock_message, subject=mock_routing_key, durable=True
526        )
527        mock_sender = self.transport.session.sender.return_value
528        mock_sender.send.assert_called_with(
529            mock_Message_cls.return_value, sync=True,
530        )
531        mock_sender.close.assert_called_with()
532
533
534@skip.if_python3()
535@skip.if_pypy()
536class test_Channel_get(ChannelTestBase):
537
538    def test_channel__get(self):
539        mock_queue = Mock()
540
541        result = self.channel._get(mock_queue)
542
543        self.transport.session.receiver.assert_called_once_with(mock_queue)
544        mock_rx = self.transport.session.receiver.return_value
545        mock_rx.fetch.assert_called_once_with(timeout=0)
546        mock_rx.close.assert_called_once_with()
547        assert mock_rx.fetch.return_value is result
548
549
550@skip.if_python3()
551@skip.if_pypy()
552class test_Channel_close(ChannelTestBase):
553
554    @pytest.fixture(autouse=True)
555    def setup_basic_cancel(self, patching, setup_channel):
556        self.mock_basic_cancel = patching.object(self.channel, 'basic_cancel')
557        self.channel.closed = False
558
559    @pytest.fixture(autouse=True)
560    def setup_receivers(self, setup_channel):
561        self.mock_receiver1 = Mock()
562        self.mock_receiver2 = Mock()
563        self.channel._receivers = {
564            1: self.mock_receiver1, 2: self.mock_receiver2,
565        }
566
567    def test_channel_close_sets_close_attribute(self):
568        self.channel.close()
569        assert self.channel.closed
570
571    def test_channel_close_calls_basic_cancel_on_all_receivers(self):
572        self.channel.close()
573        self.mock_basic_cancel.assert_has_calls([call(1), call(2)])
574
575    def test_channel_close_calls_close_channel_on_connection(self):
576        self.channel.close()
577        self.conn.close_channel.assert_called_once_with(self.channel)
578
579    def test_channel_close_calls_close_on_broker_agent(self):
580        self.channel.close()
581        self.channel._broker.close.assert_called_once_with()
582
583    def test_channel_close_does_nothing_if_already_closed(self):
584        self.channel.closed = True
585        self.channel.close()
586        self.mock_basic_cancel.assert_not_called()
587
588    def test_channel_close_does_not_call_close_channel_if_conn_is_None(self):
589        self.channel.connection = None
590        self.channel.close()
591        self.conn.close_channel.assert_not_called()
592
593
594@skip.if_python3()
595@skip.if_pypy()
596class test_Channel_basic_qos(ChannelTestBase):
597
598    def test_channel_basic_qos_always_returns_one(self):
599        self.channel.basic_qos(2)
600        assert self.channel.qos.prefetch_count == 1
601
602
603@skip.if_python3()
604@skip.if_pypy()
605class test_Channel_basic_get(ChannelTestBase):
606
607    @pytest.fixture(autouse=True)
608    def setup_channel_attributes(self, setup_channel):
609        self.channel.Message = Mock()
610        self.channel._get = Mock()
611
612    def test_channel_basic_get_calls__get_with_queue(self):
613        mock_queue = Mock()
614        self.channel.basic_get(mock_queue)
615        self.channel._get.assert_called_once_with(mock_queue)
616
617    def test_channel_basic_get_creates_Message_correctly(self):
618        mock_queue = Mock()
619        self.channel.basic_get(mock_queue)
620        mock_raw_message = self.channel._get.return_value.content
621        self.channel.Message.assert_called_once_with(
622            mock_raw_message, channel=self.channel,
623        )
624
625    def test_channel_basic_get_acknowledges_message_by_default(self):
626        mock_queue = Mock()
627        self.channel.basic_get(mock_queue)
628        mock_qpid_message = self.channel._get.return_value
629        acknowledge = self.transport.session.acknowledge
630        acknowledge.assert_called_once_with(message=mock_qpid_message)
631
632    def test_channel_basic_get_acknowledges_message_with_no_ack_False(self):
633        mock_queue = Mock()
634        self.channel.basic_get(mock_queue, no_ack=False)
635        mock_qpid_message = self.channel._get.return_value
636        acknowledge = self.transport.session.acknowledge
637        acknowledge.assert_called_once_with(message=mock_qpid_message)
638
639    def test_channel_basic_get_acknowledges_message_with_no_ack_True(self):
640        mock_queue = Mock()
641        self.channel.basic_get(mock_queue, no_ack=True)
642        mock_qpid_message = self.channel._get.return_value
643        acknowledge = self.transport.session.acknowledge
644        acknowledge.assert_called_once_with(message=mock_qpid_message)
645
646    def test_channel_basic_get_returns_correct_message(self):
647        mock_queue = Mock()
648        basic_get_result = self.channel.basic_get(mock_queue)
649        expected_message = self.channel.Message.return_value
650        assert expected_message is basic_get_result
651
652    def test_basic_get_returns_None_when_channel__get_raises_Empty(self):
653        mock_queue = Mock()
654        self.channel._get = Mock(side_effect=Empty)
655        basic_get_result = self.channel.basic_get(mock_queue)
656        assert self.channel.Message.call_count == 0
657        assert basic_get_result is None
658
659
660@skip.if_python3()
661@skip.if_pypy()
662class test_Channel_basic_cancel(ChannelTestBase):
663
664    @pytest.fixture(autouse=True)
665    def setup_receivers(self, setup_channel):
666        self.channel._receivers = {1: Mock()}
667
668    def test_channel_basic_cancel_no_error_if_consumer_tag_not_found(self):
669        self.channel.basic_cancel(2)
670
671    def test_channel_basic_cancel_pops_receiver(self):
672        self.channel.basic_cancel(1)
673        assert 1 not in self.channel._receivers
674
675    def test_channel_basic_cancel_closes_receiver(self):
676        mock_receiver = self.channel._receivers[1]
677        self.channel.basic_cancel(1)
678        mock_receiver.close.assert_called_once_with()
679
680    def test_channel_basic_cancel_pops__tag_to_queue(self):
681        self.channel._tag_to_queue = Mock()
682        self.channel.basic_cancel(1)
683        self.channel._tag_to_queue.pop.assert_called_once_with(1, None)
684
685    def test_channel_basic_cancel_pops_connection__callbacks(self):
686        self.channel._tag_to_queue = Mock()
687        self.channel.basic_cancel(1)
688        mock_queue = self.channel._tag_to_queue.pop.return_value
689        self.conn._callbacks.pop.assert_called_once_with(mock_queue, None)
690
691
692@skip.if_python3()
693@skip.if_pypy()
694class test_Channel__init__(ChannelTestBase):
695
696    def test_channel___init__sets_variables_as_expected(self):
697        assert self.conn is self.channel.connection
698        assert self.transport is self.channel.transport
699        assert not self.channel.closed
700        self.conn.get_qpid_connection.assert_called_once_with()
701        expected_broker_agent = self.mock_broker_agent.return_value
702        assert self.channel._broker is expected_broker_agent
703        assert self.channel._tag_to_queue == {}
704        assert self.channel._receivers == {}
705        assert self.channel._qos is None
706
707
708@skip.if_python3()
709@skip.if_pypy()
710class test_Channel_basic_consume(ChannelTestBase):
711
712    @pytest.fixture(autouse=True)
713    def setup_callbacks(self, setup_channel):
714        self.conn._callbacks = {}
715
716    def test_channel_basic_consume_adds_queue_to__tag_to_queue(self):
717        mock_tag = Mock()
718        mock_queue = Mock()
719        self.channel.basic_consume(mock_queue, Mock(), Mock(), mock_tag)
720        expected_dict = {mock_tag: mock_queue}
721        assert expected_dict == self.channel._tag_to_queue
722
723    def test_channel_basic_consume_adds_entry_to_connection__callbacks(self):
724        mock_queue = Mock()
725        self.channel.basic_consume(mock_queue, Mock(), Mock(), Mock())
726        assert mock_queue in self.conn._callbacks
727        assert isinstance(self.conn._callbacks[mock_queue], Callable)
728
729    def test_channel_basic_consume_creates_new_receiver(self):
730        mock_queue = Mock()
731        self.channel.basic_consume(mock_queue, Mock(), Mock(), Mock())
732        self.transport.session.receiver.assert_called_once_with(mock_queue)
733
734    def test_channel_basic_consume_saves_new_receiver(self):
735        mock_tag = Mock()
736        self.channel.basic_consume(Mock(), Mock(), Mock(), mock_tag)
737        new_mock_receiver = self.transport.session.receiver.return_value
738        expected_dict = {mock_tag: new_mock_receiver}
739        assert expected_dict == self.channel._receivers
740
741    def test_channel_basic_consume_sets_capacity_on_new_receiver(self):
742        mock_prefetch_count = Mock()
743        self.channel.qos.prefetch_count = mock_prefetch_count
744        self.channel.basic_consume(Mock(), Mock(), Mock(), Mock())
745        new_receiver = self.transport.session.receiver.return_value
746        assert new_receiver.capacity is mock_prefetch_count
747
748    def get_callback(self, no_ack=Mock(), original_cb=Mock()):
749        self.channel.Message = Mock()
750        mock_queue = Mock()
751        self.channel.basic_consume(mock_queue, no_ack, original_cb, Mock())
752        return self.conn._callbacks[mock_queue]
753
754    def test_channel_basic_consume_callback_creates_Message_correctly(self):
755        callback = self.get_callback()
756        mock_qpid_message = Mock()
757        callback(mock_qpid_message)
758        mock_content = mock_qpid_message.content
759        self.channel.Message.assert_called_once_with(
760            mock_content, channel=self.channel,
761        )
762
763    def test_channel_basic_consume_callback_adds_message_to_QoS(self):
764        self.channel._qos = Mock()
765        callback = self.get_callback()
766        mock_qpid_message = Mock()
767        callback(mock_qpid_message)
768        mock_delivery_tag = self.channel.Message.return_value.delivery_tag
769        self.channel._qos.append.assert_called_once_with(
770            mock_qpid_message, mock_delivery_tag,
771        )
772
773    def test_channel_basic_consume_callback_gratuitously_acks(self):
774        self.channel.basic_ack = Mock()
775        callback = self.get_callback()
776        mock_qpid_message = Mock()
777        callback(mock_qpid_message)
778        mock_delivery_tag = self.channel.Message.return_value.delivery_tag
779        self.channel.basic_ack.assert_called_once_with(mock_delivery_tag)
780
781    def test_channel_basic_consume_callback_does_not_ack_when_needed(self):
782        self.channel.basic_ack = Mock()
783        callback = self.get_callback(no_ack=False)
784        mock_qpid_message = Mock()
785        callback(mock_qpid_message)
786        self.channel.basic_ack.assert_not_called()
787
788    def test_channel_basic_consume_callback_calls_real_callback(self):
789        self.channel.basic_ack = Mock()
790        mock_original_callback = Mock()
791        callback = self.get_callback(original_cb=mock_original_callback)
792        mock_qpid_message = Mock()
793        callback(mock_qpid_message)
794        expected_message = self.channel.Message.return_value
795        mock_original_callback.assert_called_once_with(expected_message)
796
797
798@skip.if_python3()
799@skip.if_pypy()
800class test_Channel_queue_delete(ChannelTestBase):
801
802    @pytest.fixture(autouse=True)
803    def setup_channel_patches(self, patching, setup_channel):
804        self.mock__has_queue = patching.object(self.channel, '_has_queue')
805        self.mock__size = patching.object(self.channel, '_size')
806        self.mock__delete = patching.object(self.channel, '_delete')
807        self.mock_queue = Mock()
808
809    def test_checks_if_queue_exists(self):
810        self.channel.queue_delete(self.mock_queue)
811        self.mock__has_queue.assert_called_once_with(self.mock_queue)
812
813    def test_does_nothing_if_queue_does_not_exist(self):
814        self.mock__has_queue.return_value = False
815        self.channel.queue_delete(self.mock_queue)
816        self.mock__delete.assert_not_called()
817
818    def test_not_empty_and_if_empty_True_no_delete(self):
819        self.mock__size.return_value = 1
820        self.channel.queue_delete(self.mock_queue, if_empty=True)
821        mock_broker = self.mock_broker_agent.return_value
822        mock_broker.getQueue.assert_not_called()
823
824    def test_calls_get_queue(self):
825        self.channel.queue_delete(self.mock_queue)
826        getQueue = self.mock_broker_agent.return_value.getQueue
827        getQueue.assert_called_once_with(self.mock_queue)
828
829    def test_gets_queue_attribute(self):
830        self.channel.queue_delete(self.mock_queue)
831        queue_obj = self.mock_broker_agent.return_value.getQueue.return_value
832        queue_obj.getAttributes.assert_called_once_with()
833
834    def test_queue_in_use_and_if_unused_no_delete(self):
835        queue_obj = self.mock_broker_agent.return_value.getQueue.return_value
836        queue_obj.getAttributes.return_value = {'consumerCount': 1}
837        self.channel.queue_delete(self.mock_queue, if_unused=True)
838        self.mock__delete.assert_not_called()
839
840    def test_calls__delete_with_queue(self):
841        self.channel.queue_delete(self.mock_queue)
842        self.mock__delete.assert_called_once_with(self.mock_queue)
843
844
845@skip.if_python3()
846@skip.if_pypy()
847class test_Channel(object):
848
849    @patch(QPID_MODULE + '.qpidtoollibs')
850    def setup(self, mock_qpidtoollibs):
851        self.mock_connection = Mock()
852        self.mock_qpid_connection = Mock()
853        self.mock_qpid_session = Mock()
854        self.mock_qpid_connection.session = Mock(
855            return_value=self.mock_qpid_session,
856        )
857        self.mock_connection.get_qpid_connection = Mock(
858            return_value=self.mock_qpid_connection,
859        )
860        self.mock_transport = Mock()
861        self.mock_broker = Mock()
862        self.mock_Message = Mock()
863        self.mock_BrokerAgent = mock_qpidtoollibs.BrokerAgent
864        self.mock_BrokerAgent.return_value = self.mock_broker
865        self.my_channel = Channel(
866            self.mock_connection, self.mock_transport,
867        )
868        self.my_channel.Message = self.mock_Message
869
870    def test_verify_QoS_class_attribute(self):
871        """Verify that the class attribute QoS refers to the QoS object"""
872        assert QoS is Channel.QoS
873
874    def test_verify_Message_class_attribute(self):
875        """Verify that the class attribute Message refers to the Message
876        object."""
877        assert Message is Channel.Message
878
879    def test_body_encoding_class_attribute(self):
880        """Verify that the class attribute body_encoding is set to base64"""
881        assert Channel.body_encoding == 'base64'
882
883    def test_codecs_class_attribute(self):
884        """Verify that the codecs class attribute has a correct key and
885        value."""
886        assert isinstance(Channel.codecs, dict)
887        assert 'base64' in Channel.codecs
888        assert isinstance(Channel.codecs['base64'], Base64)
889
890    def test_size(self):
891        """Test getting the number of messages in a queue specified by
892        name and returning them."""
893        message_count = 5
894        mock_queue = Mock()
895        mock_queue_to_check = Mock()
896        mock_queue_to_check.values = {'msgDepth': message_count}
897        self.mock_broker.getQueue.return_value = mock_queue_to_check
898        result = self.my_channel._size(mock_queue)
899        self.mock_broker.getQueue.assert_called_with(mock_queue)
900        assert message_count == result
901
902    def test_delete(self):
903        """Test deleting a queue calls purge and delQueue with queue name."""
904        mock_queue = Mock()
905        self.my_channel._purge = Mock()
906        result = self.my_channel._delete(mock_queue)
907        self.my_channel._purge.assert_called_with(mock_queue)
908        self.mock_broker.delQueue.assert_called_with(mock_queue)
909        assert result is None
910
911    def test_has_queue_true(self):
912        """Test checking if a queue exists, and it does."""
913        mock_queue = Mock()
914        self.mock_broker.getQueue.return_value = True
915        result = self.my_channel._has_queue(mock_queue)
916        assert result
917
918    def test_has_queue_false(self):
919        """Test checking if a queue exists, and it does not."""
920        mock_queue = Mock()
921        self.mock_broker.getQueue.return_value = False
922        result = self.my_channel._has_queue(mock_queue)
923        assert not result
924
925    @patch('amqp.protocol.queue_declare_ok_t')
926    def test_queue_declare_with_exception_raised(self,
927                                                 mock_queue_declare_ok_t):
928        """Test declare_queue, where an exception is raised and silenced."""
929        mock_queue = Mock()
930        mock_passive = Mock()
931        mock_durable = Mock()
932        mock_exclusive = Mock()
933        mock_auto_delete = Mock()
934        mock_nowait = Mock()
935        mock_arguments = Mock()
936        mock_msg_count = Mock()
937        mock_queue.startswith.return_value = False
938        mock_queue.endswith.return_value = False
939        options = {
940            'passive': mock_passive,
941            'durable': mock_durable,
942            'exclusive': mock_exclusive,
943            'auto-delete': mock_auto_delete,
944            'arguments': mock_arguments,
945        }
946        mock_consumer_count = Mock()
947        mock_return_value = Mock()
948        values_dict = {
949            'msgDepth': mock_msg_count,
950            'consumerCount': mock_consumer_count,
951        }
952        mock_queue_data = Mock()
953        mock_queue_data.values = values_dict
954        exception_to_raise = Exception('The foo object already exists.')
955        self.mock_broker.addQueue.side_effect = exception_to_raise
956        self.mock_broker.getQueue.return_value = mock_queue_data
957        mock_queue_declare_ok_t.return_value = mock_return_value
958        result = self.my_channel.queue_declare(
959            mock_queue,
960            passive=mock_passive,
961            durable=mock_durable,
962            exclusive=mock_exclusive,
963            auto_delete=mock_auto_delete,
964            nowait=mock_nowait,
965            arguments=mock_arguments,
966        )
967        self.mock_broker.addQueue.assert_called_with(
968            mock_queue, options=options,
969        )
970        mock_queue_declare_ok_t.assert_called_with(
971            mock_queue, mock_msg_count, mock_consumer_count,
972        )
973        assert mock_return_value is result
974
975    def test_queue_declare_set_ring_policy_for_celeryev(self):
976        """Test declare_queue sets ring_policy for celeryev."""
977        mock_queue = Mock()
978        mock_queue.startswith.return_value = True
979        mock_queue.endswith.return_value = False
980        expected_default_options = {
981            'passive': False,
982            'durable': False,
983            'exclusive': False,
984            'auto-delete': True,
985            'arguments': None,
986            'qpid.policy_type': 'ring',
987        }
988        mock_msg_count = Mock()
989        mock_consumer_count = Mock()
990        values_dict = {
991            'msgDepth': mock_msg_count,
992            'consumerCount': mock_consumer_count,
993        }
994        mock_queue_data = Mock()
995        mock_queue_data.values = values_dict
996        self.mock_broker.addQueue.return_value = None
997        self.mock_broker.getQueue.return_value = mock_queue_data
998        self.my_channel.queue_declare(mock_queue)
999        mock_queue.startswith.assert_called_with('celeryev')
1000        self.mock_broker.addQueue.assert_called_with(
1001            mock_queue, options=expected_default_options,
1002        )
1003
1004    def test_queue_declare_set_ring_policy_for_pidbox(self):
1005        """Test declare_queue sets ring_policy for pidbox."""
1006        mock_queue = Mock()
1007        mock_queue.startswith.return_value = False
1008        mock_queue.endswith.return_value = True
1009        expected_default_options = {
1010            'passive': False,
1011            'durable': False,
1012            'exclusive': False,
1013            'auto-delete': True,
1014            'arguments': None,
1015            'qpid.policy_type': 'ring',
1016        }
1017        mock_msg_count = Mock()
1018        mock_consumer_count = Mock()
1019        values_dict = {
1020            'msgDepth': mock_msg_count,
1021            'consumerCount': mock_consumer_count,
1022        }
1023        mock_queue_data = Mock()
1024        mock_queue_data.values = values_dict
1025        self.mock_broker.addQueue.return_value = None
1026        self.mock_broker.getQueue.return_value = mock_queue_data
1027        self.my_channel.queue_declare(mock_queue)
1028        mock_queue.endswith.assert_called_with('pidbox')
1029        self.mock_broker.addQueue.assert_called_with(
1030            mock_queue, options=expected_default_options,
1031        )
1032
1033    def test_queue_declare_ring_policy_not_set_as_expected(self):
1034        """Test declare_queue does not set ring_policy as expected."""
1035        mock_queue = Mock()
1036        mock_queue.startswith.return_value = False
1037        mock_queue.endswith.return_value = False
1038        expected_default_options = {
1039            'passive': False,
1040            'durable': False,
1041            'exclusive': False,
1042            'auto-delete': True,
1043            'arguments': None,
1044        }
1045        mock_msg_count = Mock()
1046        mock_consumer_count = Mock()
1047        values_dict = {
1048            'msgDepth': mock_msg_count,
1049            'consumerCount': mock_consumer_count,
1050        }
1051        mock_queue_data = Mock()
1052        mock_queue_data.values = values_dict
1053        self.mock_broker.addQueue.return_value = None
1054        self.mock_broker.getQueue.return_value = mock_queue_data
1055        self.my_channel.queue_declare(mock_queue)
1056        mock_queue.startswith.assert_called_with('celeryev')
1057        mock_queue.endswith.assert_called_with('pidbox')
1058        self.mock_broker.addQueue.assert_called_with(
1059            mock_queue, options=expected_default_options,
1060        )
1061
1062    def test_queue_declare_test_defaults(self):
1063        """Test declare_queue defaults."""
1064        mock_queue = Mock()
1065        mock_queue.startswith.return_value = False
1066        mock_queue.endswith.return_value = False
1067        expected_default_options = {
1068            'passive': False,
1069            'durable': False,
1070            'exclusive': False,
1071            'auto-delete': True,
1072            'arguments': None,
1073        }
1074        mock_msg_count = Mock()
1075        mock_consumer_count = Mock()
1076        values_dict = {
1077            'msgDepth': mock_msg_count,
1078            'consumerCount': mock_consumer_count,
1079        }
1080        mock_queue_data = Mock()
1081        mock_queue_data.values = values_dict
1082        self.mock_broker.addQueue.return_value = None
1083        self.mock_broker.getQueue.return_value = mock_queue_data
1084        self.my_channel.queue_declare(mock_queue)
1085        self.mock_broker.addQueue.assert_called_with(
1086            mock_queue,
1087            options=expected_default_options,
1088        )
1089
1090    def test_queue_declare_raises_exception_not_silenced(self):
1091        unique_exception = Exception('This exception should not be silenced')
1092        mock_queue = Mock()
1093        self.mock_broker.addQueue.side_effect = unique_exception
1094        with pytest.raises(unique_exception.__class__):
1095            self.my_channel.queue_declare(mock_queue)
1096        self.mock_broker.addQueue.assert_called_once_with(
1097            mock_queue,
1098            options={
1099                'exclusive': False,
1100                'durable': False,
1101                'qpid.policy_type': 'ring',
1102                'passive': False,
1103                'arguments': None,
1104                'auto-delete': True
1105            })
1106
1107    def test_exchange_declare_raises_exception_and_silenced(self):
1108        """Create exchange where an exception is raised and then silenced"""
1109        self.mock_broker.addExchange.side_effect = Exception(
1110            'The foo object already exists.',
1111        )
1112        self.my_channel.exchange_declare()
1113
1114    def test_exchange_declare_raises_exception_not_silenced(self):
1115        """Create Exchange where an exception is raised and not silenced."""
1116        unique_exception = Exception('This exception should not be silenced')
1117        self.mock_broker.addExchange.side_effect = unique_exception
1118        with pytest.raises(unique_exception.__class__):
1119            self.my_channel.exchange_declare()
1120
1121    def test_exchange_declare(self):
1122        """Create Exchange where an exception is NOT raised."""
1123        mock_exchange = Mock()
1124        mock_type = Mock()
1125        mock_durable = Mock()
1126        options = {'durable': mock_durable}
1127        result = self.my_channel.exchange_declare(
1128            mock_exchange, mock_type, mock_durable,
1129        )
1130        self.mock_broker.addExchange.assert_called_with(
1131            mock_type, mock_exchange, options,
1132        )
1133        assert result is None
1134
1135    def test_exchange_delete(self):
1136        """Test the deletion of an exchange by name."""
1137        mock_exchange = Mock()
1138        result = self.my_channel.exchange_delete(mock_exchange)
1139        self.mock_broker.delExchange.assert_called_with(mock_exchange)
1140        assert result is None
1141
1142    def test_queue_bind(self):
1143        """Test binding a queue to an exchange using a routing key."""
1144        mock_queue = Mock()
1145        mock_exchange = Mock()
1146        mock_routing_key = Mock()
1147        self.my_channel.queue_bind(
1148            mock_queue, mock_exchange, mock_routing_key,
1149        )
1150        self.mock_broker.bind.assert_called_with(
1151            mock_exchange, mock_queue, mock_routing_key,
1152        )
1153
1154    def test_queue_unbind(self):
1155        """Test unbinding a queue from an exchange using a routing key."""
1156        mock_queue = Mock()
1157        mock_exchange = Mock()
1158        mock_routing_key = Mock()
1159        self.my_channel.queue_unbind(
1160            mock_queue, mock_exchange, mock_routing_key,
1161        )
1162        self.mock_broker.unbind.assert_called_with(
1163            mock_exchange, mock_queue, mock_routing_key,
1164        )
1165
1166    def test_queue_purge(self):
1167        """Test purging a queue by name."""
1168        mock_queue = Mock()
1169        purge_result = Mock()
1170        self.my_channel._purge = Mock(return_value=purge_result)
1171        result = self.my_channel.queue_purge(mock_queue)
1172        self.my_channel._purge.assert_called_with(mock_queue)
1173        assert purge_result is result
1174
1175    @patch(QPID_MODULE + '.Channel.qos')
1176    def test_basic_ack(self, mock_qos):
1177        """Test that basic_ack calls the QoS object properly."""
1178        mock_delivery_tag = Mock()
1179        self.my_channel.basic_ack(mock_delivery_tag)
1180        mock_qos.ack.assert_called_with(mock_delivery_tag)
1181
1182    @patch(QPID_MODULE + '.Channel.qos')
1183    def test_basic_reject(self, mock_qos):
1184        """Test that basic_reject calls the QoS object properly."""
1185        mock_delivery_tag = Mock()
1186        mock_requeue_value = Mock()
1187        self.my_channel.basic_reject(mock_delivery_tag, mock_requeue_value)
1188        mock_qos.reject.assert_called_with(
1189            mock_delivery_tag, requeue=mock_requeue_value,
1190        )
1191
1192    def test_qos_manager_is_none(self):
1193        """Test the qos property if the QoS object did not already exist."""
1194        self.my_channel._qos = None
1195        result = self.my_channel.qos
1196        assert isinstance(result, QoS)
1197        assert result == self.my_channel._qos
1198
1199    def test_qos_manager_already_exists(self):
1200        """Test the qos property if the QoS object already exists."""
1201        mock_existing_qos = Mock()
1202        self.my_channel._qos = mock_existing_qos
1203        result = self.my_channel.qos
1204        assert mock_existing_qos is result
1205
1206    def test_prepare_message(self):
1207        """Test that prepare_message() returns the correct result."""
1208        mock_body = Mock()
1209        mock_priority = Mock()
1210        mock_content_encoding = Mock()
1211        mock_content_type = Mock()
1212        mock_header1 = Mock()
1213        mock_header2 = Mock()
1214        mock_properties1 = Mock()
1215        mock_properties2 = Mock()
1216        headers = {'header1': mock_header1, 'header2': mock_header2}
1217        properties = {'properties1': mock_properties1,
1218                      'properties2': mock_properties2}
1219        result = self.my_channel.prepare_message(
1220            mock_body,
1221            priority=mock_priority,
1222            content_type=mock_content_type,
1223            content_encoding=mock_content_encoding,
1224            headers=headers,
1225            properties=properties)
1226        assert mock_body is result['body']
1227        assert mock_content_encoding is result['content-encoding']
1228        assert mock_content_type is result['content-type']
1229        assert headers == result['headers']
1230        assert properties == result['properties']
1231        assert (mock_priority is
1232                result['properties']['delivery_info']['priority'])
1233
1234    @patch('__builtin__.buffer')
1235    @patch(QPID_MODULE + '.Channel.body_encoding')
1236    @patch(QPID_MODULE + '.Channel.encode_body')
1237    @patch(QPID_MODULE + '.Channel._put')
1238    def test_basic_publish(self, mock_put,
1239                           mock_encode_body,
1240                           mock_body_encoding,
1241                           mock_buffer):
1242        """Test basic_publish()."""
1243        mock_original_body = Mock()
1244        mock_encoded_body = 'this is my encoded body'
1245        mock_message = {'body': mock_original_body,
1246                        'properties': {'delivery_info': {}}}
1247        mock_encode_body.return_value = (
1248            mock_encoded_body, mock_body_encoding,
1249        )
1250        mock_exchange = Mock()
1251        mock_routing_key = Mock()
1252        mock_encoded_buffered_body = Mock()
1253        mock_buffer.return_value = mock_encoded_buffered_body
1254        self.my_channel.basic_publish(
1255            mock_message, mock_exchange, mock_routing_key,
1256        )
1257        mock_encode_body.assert_called_once_with(
1258            mock_original_body, mock_body_encoding,
1259        )
1260        mock_buffer.assert_called_once_with(mock_encoded_body)
1261        assert mock_message['body'] is mock_encoded_buffered_body
1262        assert (mock_message['properties']['body_encoding'] is
1263                mock_body_encoding)
1264        assert isinstance(
1265            mock_message['properties']['delivery_tag'], uuid.UUID)
1266        assert (mock_message['properties']['delivery_info']['exchange'] is
1267                mock_exchange)
1268        assert (mock_message['properties']['delivery_info']['routing_key'] is
1269                mock_routing_key)
1270        mock_put.assert_called_with(
1271            mock_routing_key, mock_message, mock_exchange,
1272        )
1273
1274    @patch(QPID_MODULE + '.Channel.codecs')
1275    def test_encode_body_expected_encoding(self, mock_codecs):
1276        """Test if encode_body() works when encoding is set correctly"""
1277        mock_body = Mock()
1278        mock_encoder = Mock()
1279        mock_encoded_result = Mock()
1280        mock_codecs.get.return_value = mock_encoder
1281        mock_encoder.encode.return_value = mock_encoded_result
1282        result = self.my_channel.encode_body(mock_body, encoding='base64')
1283        expected_result = (mock_encoded_result, 'base64')
1284        assert expected_result == result
1285
1286    @patch(QPID_MODULE + '.Channel.codecs')
1287    def test_encode_body_not_expected_encoding(self, mock_codecs):
1288        """Test if encode_body() works when encoding is not set correctly."""
1289        mock_body = Mock()
1290        result = self.my_channel.encode_body(mock_body, encoding=None)
1291        expected_result = mock_body, None
1292        assert expected_result == result
1293
1294    @patch(QPID_MODULE + '.Channel.codecs')
1295    def test_decode_body_expected_encoding(self, mock_codecs):
1296        """Test if decode_body() works when encoding is set correctly."""
1297        mock_body = Mock()
1298        mock_decoder = Mock()
1299        mock_decoded_result = Mock()
1300        mock_codecs.get.return_value = mock_decoder
1301        mock_decoder.decode.return_value = mock_decoded_result
1302        result = self.my_channel.decode_body(mock_body, encoding='base64')
1303        assert mock_decoded_result == result
1304
1305    @patch(QPID_MODULE + '.Channel.codecs')
1306    def test_decode_body_not_expected_encoding(self, mock_codecs):
1307        """Test if decode_body() works when encoding is not set correctly."""
1308        mock_body = Mock()
1309        result = self.my_channel.decode_body(mock_body, encoding=None)
1310        assert mock_body == result
1311
1312    def test_typeof_exchange_exists(self):
1313        """Test that typeof() finds an exchange that already exists."""
1314        mock_exchange = Mock()
1315        mock_qpid_exchange = Mock()
1316        mock_attributes = {}
1317        mock_type = Mock()
1318        mock_attributes['type'] = mock_type
1319        mock_qpid_exchange.getAttributes.return_value = mock_attributes
1320        self.mock_broker.getExchange.return_value = mock_qpid_exchange
1321        result = self.my_channel.typeof(mock_exchange)
1322        assert mock_type is result
1323
1324    def test_typeof_exchange_does_not_exist(self):
1325        """Test that typeof() finds an exchange that does not exists."""
1326        mock_exchange = Mock()
1327        mock_default = Mock()
1328        self.mock_broker.getExchange.return_value = None
1329        result = self.my_channel.typeof(mock_exchange, default=mock_default)
1330        assert mock_default is result
1331
1332
1333@skip.if_python3()
1334@skip.if_pypy()
1335@pytest.mark.usefixtures('disable_runtime_dependency_check')
1336class test_Transport__init__(object):
1337
1338    @pytest.fixture(autouse=True)
1339    def mock_verify_runtime_environment(self, patching):
1340        self.mock_verify_runtime_environment = patching.object(
1341            Transport, 'verify_runtime_environment')
1342
1343    @pytest.fixture(autouse=True)
1344    def mock_transport_init(self, patching):
1345        self.mock_base_Transport__init__ = patching(
1346            QPID_MODULE + '.base.Transport.__init__')
1347
1348    def test_Transport___init___calls_verify_runtime_environment(self):
1349        Transport(Mock())
1350        self.mock_verify_runtime_environment.assert_called_once_with()
1351
1352    def test_transport___init___calls_parent_class___init__(self):
1353        m = Mock()
1354        Transport(m)
1355        self.mock_base_Transport__init__.assert_called_once_with(m)
1356
1357    def test_transport___init___sets_use_async_interface_False(self):
1358        transport = Transport(Mock())
1359        assert not transport.use_async_interface
1360
1361
1362@skip.if_python3()
1363@skip.if_pypy()
1364@pytest.mark.usefixtures('disable_runtime_dependency_check')
1365class test_Transport_drain_events(object):
1366
1367    @pytest.fixture(autouse=True)
1368    def setup_self(self, disable_runtime_dependency_check):
1369        # ^^ disable_runtime.. must be called before this fixture.
1370        self.transport = Transport(Mock())
1371        self.transport.session = Mock()
1372        self.mock_queue = Mock()
1373        self.mock_message = Mock()
1374        self.mock_conn = Mock()
1375        self.mock_callback = Mock()
1376        self.mock_conn._callbacks = {self.mock_queue: self.mock_callback}
1377
1378    def mock_next_receiver(self, timeout):
1379        time.sleep(0.3)
1380        mock_receiver = Mock()
1381        mock_receiver.source = self.mock_queue
1382        mock_receiver.fetch.return_value = self.mock_message
1383        return mock_receiver
1384
1385    def test_socket_timeout_raised_when_all_receivers_empty(self):
1386        with patch(QPID_MODULE + '.QpidEmpty', new=QpidException):
1387            self.transport.session.next_receiver.side_effect = QpidException()
1388            with pytest.raises(socket.timeout):
1389                self.transport.drain_events(Mock())
1390
1391    def test_socket_timeout_raised_when_by_timeout(self):
1392        self.transport.session.next_receiver = self.mock_next_receiver
1393        with pytest.raises(socket.timeout):
1394            self.transport.drain_events(self.mock_conn, timeout=1)
1395
1396    def test_timeout_returns_no_earlier_then_asked_for(self):
1397        self.transport.session.next_receiver = self.mock_next_receiver
1398        start_time = monotonic()
1399        try:
1400            self.transport.drain_events(self.mock_conn, timeout=1)
1401        except socket.timeout:
1402            pass
1403        elapsed_time_in_s = monotonic() - start_time
1404        assert elapsed_time_in_s >= 1.0
1405
1406    def test_callback_is_called(self):
1407        self.transport.session.next_receiver = self.mock_next_receiver
1408        try:
1409            self.transport.drain_events(self.mock_conn, timeout=1)
1410        except socket.timeout:
1411            pass
1412        self.mock_callback.assert_called_with(self.mock_message)
1413
1414
1415@skip.if_python3()
1416@skip.if_pypy()
1417class test_Transport_create_channel(object):
1418
1419    @pytest.fixture(autouse=True)
1420    def setup_self(self, disable_runtime_dependency_check):
1421        # ^^ disable runtime MUST be called before this fixture
1422        self.transport = Transport(Mock())
1423        self.mock_conn = Mock()
1424        self.mock_new_channel = Mock()
1425        self.mock_conn.Channel.return_value = self.mock_new_channel
1426        self.returned_channel = self.transport.create_channel(self.mock_conn)
1427
1428    def test_new_channel_created_from_connection(self):
1429        assert self.mock_new_channel is self.returned_channel
1430        self.mock_conn.Channel.assert_called_with(
1431            self.mock_conn, self.transport,
1432        )
1433
1434    def test_new_channel_added_to_connection_channel_list(self):
1435        append_method = self.mock_conn.channels.append
1436        append_method.assert_called_with(self.mock_new_channel)
1437
1438
1439@skip.if_python3()
1440@skip.if_pypy()
1441@pytest.mark.usefixtures('disable_runtime_dependency_check')
1442class test_Transport_establish_connection(object):
1443
1444    @pytest.fixture(autouse=True)
1445    def setup_self(self, disable_runtime_dependency_check):
1446
1447        class MockClient(object):
1448            pass
1449
1450        self.client = MockClient()
1451        self.client.connect_timeout = 4
1452        self.client.ssl = False
1453        self.client.transport_options = {}
1454        self.client.userid = None
1455        self.client.password = None
1456        self.client.login_method = None
1457        self.transport = Transport(self.client)
1458        self.mock_conn = Mock()
1459        self.transport.Connection = self.mock_conn
1460
1461    def test_transport_establish_conn_new_option_overwrites_default(self):
1462        self.client.userid = 'new-userid'
1463        self.client.password = 'new-password'
1464        self.transport.establish_connection()
1465        self.mock_conn.assert_called_once_with(
1466            username=self.client.userid,
1467            password=self.client.password,
1468            sasl_mechanisms='PLAIN',
1469            host='localhost',
1470            timeout=4,
1471            port=5672,
1472            transport='tcp',
1473        )
1474
1475    def test_transport_establish_conn_empty_client_is_default(self):
1476        self.transport.establish_connection()
1477        self.mock_conn.assert_called_once_with(
1478            sasl_mechanisms='ANONYMOUS',
1479            host='localhost',
1480            timeout=4,
1481            port=5672,
1482            transport='tcp',
1483        )
1484
1485    def test_transport_establish_conn_additional_transport_option(self):
1486        new_param_value = 'mynewparam'
1487        self.client.transport_options['new_param'] = new_param_value
1488        self.transport.establish_connection()
1489        self.mock_conn.assert_called_once_with(
1490            sasl_mechanisms='ANONYMOUS',
1491            host='localhost',
1492            timeout=4,
1493            new_param=new_param_value,
1494            port=5672,
1495            transport='tcp',
1496        )
1497
1498    def test_transport_establish_conn_transform_localhost_to_127_0_0_1(self):
1499        self.client.hostname = 'localhost'
1500        self.transport.establish_connection()
1501        self.mock_conn.assert_called_once_with(
1502            sasl_mechanisms='ANONYMOUS',
1503            host='localhost',
1504            timeout=4,
1505            port=5672,
1506            transport='tcp',
1507        )
1508
1509    def test_transport_password_no_userid_raises_exception(self):
1510        self.client.password = 'somepass'
1511        with pytest.raises(Exception):
1512            self.transport.establish_connection()
1513
1514    def test_transport_userid_no_password_raises_exception(self):
1515        self.client.userid = 'someusername'
1516        with pytest.raises(Exception):
1517            self.transport.establish_connection()
1518
1519    def test_transport_overrides_sasl_mech_from_login_method(self):
1520        self.client.login_method = 'EXTERNAL'
1521        self.transport.establish_connection()
1522        self.mock_conn.assert_called_once_with(
1523            sasl_mechanisms='EXTERNAL',
1524            host='localhost',
1525            timeout=4,
1526            port=5672,
1527            transport='tcp',
1528        )
1529
1530    def test_transport_overrides_sasl_mech_has_username(self):
1531        self.client.userid = 'new-userid'
1532        self.client.login_method = 'EXTERNAL'
1533        self.transport.establish_connection()
1534        self.mock_conn.assert_called_once_with(
1535            username=self.client.userid,
1536            sasl_mechanisms='EXTERNAL',
1537            host='localhost',
1538            timeout=4,
1539            port=5672,
1540            transport='tcp',
1541        )
1542
1543    def test_transport_establish_conn_set_password(self):
1544        self.client.userid = 'someuser'
1545        self.client.password = 'somepass'
1546        self.transport.establish_connection()
1547        self.mock_conn.assert_called_once_with(
1548            username='someuser',
1549            password='somepass',
1550            sasl_mechanisms='PLAIN',
1551            host='localhost',
1552            timeout=4,
1553            port=5672,
1554            transport='tcp',
1555        )
1556
1557    def test_transport_establish_conn_no_ssl_sets_transport_tcp(self):
1558        self.client.ssl = False
1559        self.transport.establish_connection()
1560        self.mock_conn.assert_called_once_with(
1561            sasl_mechanisms='ANONYMOUS',
1562            host='localhost',
1563            timeout=4,
1564            port=5672,
1565            transport='tcp',
1566        )
1567
1568    def test_transport_establish_conn_with_ssl_with_hostname_check(self):
1569        self.client.ssl = {
1570            'keyfile': 'my_keyfile',
1571            'certfile': 'my_certfile',
1572            'ca_certs': 'my_cacerts',
1573            'cert_reqs': ssl.CERT_REQUIRED,
1574        }
1575        self.transport.establish_connection()
1576        self.mock_conn.assert_called_once_with(
1577            ssl_certfile='my_certfile',
1578            ssl_trustfile='my_cacerts',
1579            timeout=4,
1580            ssl_skip_hostname_check=False,
1581            sasl_mechanisms='ANONYMOUS',
1582            host='localhost',
1583            ssl_keyfile='my_keyfile',
1584            port=5672, transport='ssl',
1585        )
1586
1587    def test_transport_establish_conn_with_ssl_skip_hostname_check(self):
1588        self.client.ssl = {
1589            'keyfile': 'my_keyfile',
1590            'certfile': 'my_certfile',
1591            'ca_certs': 'my_cacerts',
1592            'cert_reqs': ssl.CERT_OPTIONAL,
1593        }
1594        self.transport.establish_connection()
1595        self.mock_conn.assert_called_once_with(
1596            ssl_certfile='my_certfile',
1597            ssl_trustfile='my_cacerts',
1598            timeout=4,
1599            ssl_skip_hostname_check=True,
1600            sasl_mechanisms='ANONYMOUS',
1601            host='localhost',
1602            ssl_keyfile='my_keyfile',
1603            port=5672, transport='ssl',
1604        )
1605
1606    def test_transport_establish_conn_sets_client_on_connection_object(self):
1607        self.transport.establish_connection()
1608        assert self.mock_conn.return_value.client is self.client
1609
1610    def test_transport_establish_conn_creates_session_on_transport(self):
1611        self.transport.establish_connection()
1612        qpid_conn = self.mock_conn.return_value.get_qpid_connection
1613        new_mock_session = qpid_conn.return_value.session.return_value
1614        assert self.transport.session is new_mock_session
1615
1616    def test_transport_establish_conn_returns_new_connection_object(self):
1617        new_conn = self.transport.establish_connection()
1618        assert new_conn is self.mock_conn.return_value
1619
1620    def test_transport_establish_conn_uses_hostname_if_not_default(self):
1621        self.client.hostname = 'some_other_hostname'
1622        self.transport.establish_connection()
1623        self.mock_conn.assert_called_once_with(
1624            sasl_mechanisms='ANONYMOUS',
1625            host='some_other_hostname',
1626            timeout=4,
1627            port=5672,
1628            transport='tcp',
1629        )
1630
1631    def test_transport_sets_qpid_message_ready_handler(self):
1632        self.transport.establish_connection()
1633        qpid_conn_call = self.mock_conn.return_value.get_qpid_connection
1634        mock_session = qpid_conn_call.return_value.session.return_value
1635        mock_set_callback = mock_session.set_message_received_notify_handler
1636        expected_msg_callback = self.transport._qpid_message_ready_handler
1637        mock_set_callback.assert_called_once_with(expected_msg_callback)
1638
1639    def test_transport_sets_session_exception_handler(self):
1640        self.transport.establish_connection()
1641        qpid_conn_call = self.mock_conn.return_value.get_qpid_connection
1642        mock_session = qpid_conn_call.return_value.session.return_value
1643        mock_set_callback = mock_session.set_async_exception_notify_handler
1644        exc_callback = self.transport._qpid_async_exception_notify_handler
1645        mock_set_callback.assert_called_once_with(exc_callback)
1646
1647    def test_transport_sets_connection_exception_handler(self):
1648        self.transport.establish_connection()
1649        qpid_conn_call = self.mock_conn.return_value.get_qpid_connection
1650        qpid_conn = qpid_conn_call.return_value
1651        mock_set_callback = qpid_conn.set_async_exception_notify_handler
1652        exc_callback = self.transport._qpid_async_exception_notify_handler
1653        mock_set_callback.assert_called_once_with(exc_callback)
1654
1655
1656@skip.if_python3()
1657@skip.if_pypy()
1658class test_Transport_class_attributes(object):
1659
1660    def test_verify_Connection_attribute(self):
1661        assert Connection is Transport.Connection
1662
1663    def test_verify_polling_disabled(self):
1664        assert Transport.polling_interval is None
1665
1666    def test_verify_driver_type_and_name(self):
1667        assert Transport.driver_type == 'qpid'
1668        assert Transport.driver_name == 'qpid'
1669
1670    def test_verify_implements_exchange_types(self):
1671        assert 'fanout' in Transport.implements.exchange_type
1672        assert 'direct' in Transport.implements.exchange_type
1673        assert 'topic' in Transport.implements.exchange_type
1674        assert 'frobnitz' not in Transport.implements.exchange_type
1675
1676    def test_transport_verify_recoverable_connection_errors(self):
1677        connection_errors = Transport.recoverable_connection_errors
1678        assert ConnectionError in connection_errors
1679        assert select.error in connection_errors
1680
1681    def test_transport_verify_recoverable_channel_errors(self):
1682        channel_errors = Transport.recoverable_channel_errors
1683        assert NotFound in channel_errors
1684
1685    def test_transport_verify_pre_kombu_3_0_exception_labels(self):
1686        assert (Transport.recoverable_channel_errors ==
1687                Transport.channel_errors)
1688        assert (Transport.recoverable_connection_errors ==
1689                Transport.connection_errors)
1690
1691
1692@skip.if_python3()
1693@skip.if_pypy()
1694@pytest.mark.usefixtures('disable_runtime_dependency_check')
1695class test_Transport_register_with_event_loop(object):
1696
1697    def test_transport_register_with_event_loop_calls_add_reader(self):
1698        transport = Transport(Mock())
1699        mock_connection = Mock()
1700        mock_loop = Mock()
1701        transport.register_with_event_loop(mock_connection, mock_loop)
1702        mock_loop.add_reader.assert_called_with(
1703            transport.r, transport.on_readable, mock_connection, mock_loop,
1704        )
1705
1706
1707@skip.if_python3()
1708@skip.if_pypy()
1709@pytest.mark.usefixtures('disable_runtime_dependency_check')
1710class test_Transport_Qpid_callback_handlers_async(object):
1711
1712    @pytest.fixture(autouse=True)
1713    def setup_self(self, patching, disable_runtime_dependency_check):
1714        self.mock_os_write = patching(QPID_MODULE + '.os.write')
1715        self.transport = Transport(Mock())
1716        self.transport.register_with_event_loop(Mock(), Mock())
1717
1718    def test__qpid_message_ready_handler_writes_symbol_to_fd(self):
1719        self.transport._qpid_message_ready_handler(Mock())
1720        self.mock_os_write.assert_called_once_with(self.transport._w, '0')
1721
1722    def test__qpid_async_exception_notify_handler_writes_symbol_to_fd(self):
1723        self.transport._qpid_async_exception_notify_handler(Mock(), Mock())
1724        self.mock_os_write.assert_called_once_with(self.transport._w, 'e')
1725
1726
1727@skip.if_python3()
1728@skip.if_pypy()
1729@pytest.mark.usefixtures('disable_runtime_dependency_check')
1730class test_Transport_Qpid_callback_handlers_sync(object):
1731
1732    @pytest.fixture(autouse=True)
1733    def setup(self, patching, disable_runtime_dependency_check):
1734        self.mock_os_write = patching(QPID_MODULE + '.os.write')
1735        self.transport = Transport(Mock())
1736
1737    def test__qpid_message_ready_handler_dows_not_write(self):
1738        self.transport._qpid_message_ready_handler(Mock())
1739        self.mock_os_write.assert_not_called()
1740
1741    def test__qpid_async_exception_notify_handler_does_not_write(self):
1742        self.transport._qpid_async_exception_notify_handler(Mock(), Mock())
1743        self.mock_os_write.assert_not_called()
1744
1745
1746@skip.if_python3()
1747@skip.if_pypy()
1748@pytest.mark.usefixtures('disable_runtime_dependency_check')
1749class test_Transport_on_readable(object):
1750
1751    @pytest.fixture(autouse=True)
1752    def setup_self(self, patching, disable_runtime_dependency_check):
1753        self.mock_os_read = patching(QPID_MODULE + '.os.read')
1754
1755        self.mock_drain_events = patching.object(Transport, 'drain_events')
1756        self.transport = Transport(Mock())
1757        self.transport.register_with_event_loop(Mock(), Mock())
1758
1759    def test_transport_on_readable_reads_symbol_from_fd(self):
1760        self.transport.on_readable(Mock(), Mock())
1761        self.mock_os_read.assert_called_once_with(self.transport.r, 1)
1762
1763    def test_transport_on_readable_calls_drain_events(self):
1764        mock_connection = Mock()
1765        self.transport.on_readable(mock_connection, Mock())
1766        self.mock_drain_events.assert_called_with(mock_connection)
1767
1768    def test_transport_on_readable_catches_socket_timeout(self):
1769        self.mock_drain_events.side_effect = socket.timeout()
1770        self.transport.on_readable(Mock(), Mock())
1771
1772    def test_transport_on_readable_ignores_non_socket_timeout_exception(self):
1773        self.mock_drain_events.side_effect = IOError()
1774        with pytest.raises(IOError):
1775            self.transport.on_readable(Mock(), Mock())
1776
1777
1778@skip.if_python3()
1779@skip.if_pypy()
1780@pytest.mark.usefixtures('disable_runtime_dependency_check')
1781class test_Transport_verify_runtime_environment(object):
1782
1783    @pytest.fixture(autouse=True)
1784    def setup_self(self, patching):
1785        self.verify_runtime_environment = Transport.verify_runtime_environment
1786        patching.object(Transport, 'verify_runtime_environment')
1787        self.transport = Transport(Mock())
1788
1789    @patch(QPID_MODULE + '.PY3', new=True)
1790    def test_raises_exception_for_Python3(self):
1791        with pytest.raises(RuntimeError):
1792            self.verify_runtime_environment(self.transport)
1793
1794    @patch('__builtin__.getattr')
1795    def test_raises_exc_for_PyPy(self, mock_getattr):
1796        mock_getattr.return_value = True
1797        with pytest.raises(RuntimeError):
1798            self.verify_runtime_environment(self.transport)
1799
1800    @patch(QPID_MODULE + '.dependency_is_none')
1801    def test_raises_exc_dep_missing(self, mock_dep_is_none):
1802        mock_dep_is_none.return_value = True
1803        with pytest.raises(RuntimeError):
1804            self.verify_runtime_environment(self.transport)
1805
1806    @patch(QPID_MODULE + '.dependency_is_none')
1807    def test_calls_dependency_is_none(self, mock_dep_is_none):
1808        mock_dep_is_none.return_value = False
1809        self.verify_runtime_environment(self.transport)
1810        mock_dep_is_none.assert_called()
1811
1812    def test_raises_no_exception(self):
1813        self.verify_runtime_environment(self.transport)
1814
1815
1816@skip.if_python3()
1817@skip.if_pypy()
1818@pytest.mark.usefixtures('disable_runtime_dependency_check')
1819class test_Transport(object):
1820
1821    def setup(self):
1822        """Creates a mock_client to be used in testing."""
1823        self.mock_client = Mock()
1824
1825    def test_supports_ev(self):
1826        """Test that the transport claims to support async event loop"""
1827        assert Transport(self.mock_client).supports_ev
1828
1829    def test_close_connection(self):
1830        """Test that close_connection calls close on the connection."""
1831        my_transport = Transport(self.mock_client)
1832        mock_connection = Mock()
1833        my_transport.close_connection(mock_connection)
1834        mock_connection.close.assert_called_once_with()
1835
1836    def test_default_connection_params(self):
1837        """Test that the default_connection_params are correct"""
1838        correct_params = {
1839            'hostname': 'localhost',
1840            'port': 5672,
1841        }
1842        my_transport = Transport(self.mock_client)
1843        result_params = my_transport.default_connection_params
1844        assert correct_params == result_params
1845
1846    @patch(QPID_MODULE + '.os.close')
1847    def test_del_sync(self, close):
1848        my_transport = Transport(self.mock_client)
1849        my_transport.__del__()
1850        close.assert_not_called()
1851
1852    @patch(QPID_MODULE + '.os.close')
1853    def test_del_async(self, close):
1854        my_transport = Transport(self.mock_client)
1855        my_transport.register_with_event_loop(Mock(), Mock())
1856        my_transport.__del__()
1857        close.assert_called()
1858
1859    @patch(QPID_MODULE + '.os.close')
1860    def test_del_async_failed(self, close):
1861        close.side_effect = OSError()
1862        my_transport = Transport(self.mock_client)
1863        my_transport.register_with_event_loop(Mock(), Mock())
1864        my_transport.__del__()
1865        close.assert_called()
1866