1"""
2Tests for pika.channel.Channel
3"""
4import collections
5import logging
6import sys
7import unittest
8import warnings
9
10try:
11    from unittest import mock  # pylint: disable=C0412
12except ImportError:
13    import mock
14
15from pika import channel, connection, exceptions, frame, spec
16
17# Disable protected-access, missing-docstring, and invalid-name,
18# too-many-public-methods, too-many-lines
19# pylint: disable=W0212,C0111,C0103,R0904,C0302
20
21class ConnectionTemplate(connection.Connection):
22    """Template for using as mock spec_set for the pika Connection class. It
23    defines members accessed by the code under test that would be defined in
24    the base class's constructor.
25    """
26    callbacks = None
27
28    # Suppress pylint warnings about specific abstract methods not being
29    # overridden
30    _adapter_connect_stream = connection.Connection._adapter_connect_stream
31    _adapter_disconnect_stream = connection.Connection._adapter_disconnect_stream
32    _adapter_emit_data = connection.Connection._adapter_emit_data
33    _adapter_call_later = connection.Connection._adapter_call_later
34    _adapter_remove_timeout = connection.Connection._adapter_remove_timeout
35    _adapter_add_callback_threadsafe = (
36        connection.Connection._adapter_add_callback_threadsafe)
37
38
39class ChannelTests(unittest.TestCase):
40    @staticmethod
41    @mock.patch('pika.connection.Connection', autospec=ConnectionTemplate)
42    def _create_connection(connection_class_mock=None):
43        return connection_class_mock()
44
45    def setUp(self):
46        self.connection = self._create_connection()
47        self._on_openok_callback = mock.Mock()
48        self.obj = channel.Channel(self.connection, 1,
49                                   self._on_openok_callback)
50        warnings.resetwarnings()
51
52    def tearDown(self):
53        del self.connection
54        del self._on_openok_callback
55        del self.obj
56        warnings.resetwarnings()
57
58    def test_init_invalid_channel_number(self):
59        self.assertRaises(exceptions.InvalidChannelNumber, channel.Channel,
60                          'Foo', self.connection, lambda *args: None)
61
62    def test_init_channel_number(self):
63        self.assertEqual(self.obj.channel_number, 1)
64
65    def test_init_callbacks(self):
66        self.assertEqual(self.obj.callbacks, self.connection.callbacks)
67
68    def test_init_connection(self):
69        self.assertEqual(self.obj.connection, self.connection)
70
71    def test_init_content_frame_assembler(self):
72        self.assertIsInstance(self.obj._content_assembler,
73                              channel.ContentFrameAssembler)
74
75    def test_init_blocked(self):
76        self.assertIsInstance(self.obj._blocked, collections.deque)
77
78    def test_init_blocking(self):
79        self.assertEqual(self.obj._blocking, None)
80
81    def test_init_on_flowok_callback(self):
82        self.assertEqual(self.obj._on_flowok_callback, None)
83
84    def test_init_has_on_flow_callback(self):
85        self.assertEqual(self.obj._has_on_flow_callback, False)
86
87    def test_init_on_openok_callback(self):
88        self.assertEqual(self.obj._on_openok_callback,
89                         self._on_openok_callback)
90
91    def test_init_state(self):
92        self.assertEqual(self.obj._state, channel.Channel.CLOSED)
93
94    def test_init_cancelled(self):
95        self.assertIsInstance(self.obj._cancelled, set)
96
97    def test_init_consumers(self):
98        self.assertEqual(self.obj._consumers, dict())
99
100    def test_init_flow(self):
101        self.assertEqual(self.obj.flow_active, True)
102
103    def test_init_on_getok_callback(self):
104        self.assertEqual(self.obj._on_getok_callback, None)
105
106    def test_add_callback(self):
107        mock_callback = mock.Mock()
108        self.obj.add_callback(mock_callback, [spec.Basic.Qos])
109        self.connection.callbacks.add.assert_called_once_with(
110            self.obj.channel_number, spec.Basic.Qos, mock_callback, True)
111
112    def test_add_callback_multiple_replies(self):
113        mock_callback = mock.Mock()
114        self.obj.add_callback(mock_callback,
115                              [spec.Basic.Qos, spec.Basic.QosOk])
116        calls = [
117            mock.call(self.obj.channel_number, spec.Basic.Qos, mock_callback,
118                      True),
119            mock.call(self.obj.channel_number, spec.Basic.QosOk, mock_callback,
120                      True)
121        ]
122        self.connection.callbacks.add.assert_has_calls(calls)
123
124    def test_add_on_cancel_callback(self):
125        mock_callback = mock.Mock()
126        self.obj.add_on_cancel_callback(mock_callback)
127        self.connection.callbacks.add.assert_called_once_with(
128            self.obj.channel_number, spec.Basic.Cancel, mock_callback, False)
129
130    def test_add_on_close_callback(self):
131        mock_callback = mock.Mock()
132        self.obj.add_on_close_callback(mock_callback)
133        self.connection.callbacks.add.assert_called_once_with(
134            self.obj.channel_number, '_on_channel_close', mock_callback, False,
135            self.obj)
136
137    def test_add_on_flow_callback(self):
138        mock_callback = mock.Mock()
139        self.obj.add_on_flow_callback(mock_callback)
140        self.connection.callbacks.add.assert_called_once_with(
141            self.obj.channel_number, spec.Channel.Flow, mock_callback, False)
142
143    def test_add_on_return_callback(self):
144        mock_callback = mock.Mock()
145        self.obj.add_on_return_callback(mock_callback)
146        self.connection.callbacks.add.assert_called_once_with(
147            self.obj.channel_number, '_on_return', mock_callback, False)
148
149    def test_basic_ack_channel_closed(self):
150        self.assertRaises(exceptions.ChannelWrongStateError, self.obj.basic_ack)
151
152    @mock.patch('pika.spec.Basic.Ack')
153    @mock.patch('pika.channel.Channel._send_method')
154    def test_basic_ack_calls_send_method(self, send_method, _unused):
155        self.obj._set_state(self.obj.OPEN)
156        self.obj.basic_ack(1, False)
157        send_method.assert_called_once_with(spec.Basic.Ack(1, False))
158
159    def test_basic_cancel_asynch(self):
160        self.obj._set_state(self.obj.OPEN)
161        self.obj._consumers['ctag0'] = logging.debug
162        self.obj._rpc = mock.Mock(wraps=self.obj._rpc)
163        self.obj.basic_cancel(consumer_tag='ctag0')
164
165        self.assertTrue(self.obj._rpc.called)
166        self.assertFalse(self.obj.callbacks.add.called)
167
168    def test_basic_cancel_asynch_with_user_callback_raises_value_error(self):
169        self.obj._set_state(self.obj.OPEN)
170        consumer_tag = 'ctag0'
171        callback_mock = mock.Mock()
172        self.obj._consumers[consumer_tag] = callback_mock
173        self.assertRaises(
174            TypeError,
175            self.obj.basic_cancel,
176            consumer_tag,
177            callback='bad-callback')
178
179    @mock.patch('pika.channel.Channel._raise_if_not_open')
180    def test_basic_cancel_calls_raise_if_not_open(self, raise_if_not_open):
181        self.obj._set_state(self.obj.OPEN)
182        consumer_tag = 'ctag0'
183        callback_mock = mock.Mock()
184        self.obj._consumers[consumer_tag] = mock.Mock()
185
186        self.obj.basic_cancel(consumer_tag, callback=callback_mock)
187
188        raise_if_not_open.assert_called_once_with()
189
190    def test_basic_cancel_synch(self):
191        self.obj._set_state(self.obj.OPEN)
192        consumer_tag = 'ctag0'
193        callback_mock = mock.Mock()
194        self.obj._consumers[consumer_tag] = mock.Mock()
195
196        self.obj.basic_cancel(consumer_tag, callback=callback_mock)
197
198        # Verify consumer tag added to the cancelled list
199        self.assertListEqual(list(self.obj._cancelled), [consumer_tag])
200        # Verify user completion callback registered
201        self.obj.callbacks.add.assert_any_call(
202            self.obj.channel_number, spec.Basic.CancelOk, callback_mock)
203        # Verify Channel._on_cancelok callback registered
204        self.obj.callbacks.add.assert_any_call(
205            self.obj.channel_number,
206            spec.Basic.CancelOk,
207            self.obj._on_cancelok,
208            arguments={
209                'consumer_tag': 'ctag0'
210            })
211        # Verify Channel._on_synchronous_complete callback registered
212        self.obj.callbacks.add.assert_any_call(
213            self.obj.channel_number,
214            spec.Basic.CancelOk,
215            self.obj._on_synchronous_complete,
216            arguments={
217                'consumer_tag': 'ctag0'
218            })
219
220    def test_basic_cancel_synch_no_user_callback_raises_value_error(self):
221        self.obj._set_state(self.obj.OPEN)
222        self.obj._consumers['ctag0'] = mock.Mock()
223
224        with self.assertRaises(TypeError):
225            self.obj.basic_cancel(consumer_tag='ctag0', callback='bad-callback')
226
227        # Verify arg error detected raised without making state changes
228        self.assertIn('ctag0', self.obj._consumers)
229        self.assertEqual(len(self.obj._cancelled), 0)
230
231    def test_basic_cancel_then_close(self):
232        self.obj._set_state(self.obj.OPEN)
233        callback_mock = mock.Mock()
234        consumer_tag = 'ctag0'
235        self.obj._consumers[consumer_tag] = mock.Mock()
236        self.obj.basic_cancel(consumer_tag, callback=callback_mock)
237        try:
238            self.obj.close()
239        except exceptions.ChannelClosed:
240            self.fail('unable to cancel consumers as channel is closing')
241        self.assertTrue(self.obj.is_closing)
242
243    @mock.patch('pika.channel.Channel._rpc')
244    def test_basic_cancel_unknown_consumer_tag(self, rpc):
245        self.obj._set_state(self.obj.OPEN)
246        callback_mock = mock.Mock()
247        consumer_tag = 'ctag0'
248        self.obj.basic_cancel(consumer_tag, callback=callback_mock)
249        self.assertFalse(rpc.called)
250
251    def test_basic_consume_legacy_parameter_queue(self):
252        # This is for the unlikely scenario where only
253        # the first parameter is updated
254        self.obj._set_state(self.obj.OPEN)
255        with self.assertRaises(TypeError):
256            self.obj.basic_consume('queue',
257                                   'whoops this should be a callback')
258
259    def test_basic_consume_legacy_parameter_callback(self):
260        self.obj._set_state(self.obj.OPEN)
261        callback_mock = mock.Mock()
262        with self.assertRaises(TypeError):
263            self.obj.basic_consume(callback_mock, 'queue')
264
265    def test_queue_declare_legacy_parameter_callback(self):
266        self.obj._set_state(self.obj.OPEN)
267        callback_mock = mock.Mock()
268        with self.assertRaises(TypeError):
269            self.obj.queue_declare(callback_mock, 'queue')
270
271    def test_exchange_declare_legacy_parameter_callback(self):
272        self.obj._set_state(self.obj.OPEN)
273        callback_mock = mock.Mock()
274        with self.assertRaises(TypeError):
275            self.obj.exchange_declare(callback_mock, 'exchange')
276
277    def test_queue_bind_legacy_parameter_callback(self):
278        self.obj._set_state(self.obj.OPEN)
279        callback_mock = mock.Mock()
280        with self.assertRaises(TypeError):
281            self.obj.queue_bind(callback_mock,
282                                'queue',
283                                'exchange')
284
285    def test_basic_cancel_legacy_parameter(self):
286        self.obj._set_state(self.obj.OPEN)
287        callback_mock = mock.Mock()
288        with self.assertRaises(TypeError):
289            self.obj.basic_cancel(callback_mock, 'tag')
290
291    def test_basic_get_legacy_parameter(self):
292        self.obj._set_state(self.obj.OPEN)
293        callback_mock = mock.Mock()
294        with self.assertRaises(TypeError):
295            self.obj.basic_get(callback_mock)
296
297    def test_basic_qos_legacy_parameter(self):
298        self.obj._set_state(self.obj.OPEN)
299        callback_mock = mock.Mock()
300        with self.assertRaises(TypeError):
301            self.obj.basic_get(callback_mock, 0, 0, False)
302
303    def test_basic_recover_legacy_parameter(self):
304        self.obj._set_state(self.obj.OPEN)
305        callback_mock = mock.Mock()
306        with self.assertRaises(TypeError):
307            self.obj.basic_recover(callback_mock, True)
308
309    def test_confirm_delivery_legacy_no_parameters(self):
310        self.obj._set_state(self.obj.OPEN)
311        with self.assertRaises(TypeError):
312            self.obj.confirm_delivery()
313
314    def test_confirm_delivery_legacy_nowait_parameter(self):
315        self.obj._set_state(self.obj.OPEN)
316        callback_mock = mock.Mock()
317        with self.assertRaises(TypeError):
318            self.obj.confirm_delivery(callback_mock, True)
319
320    def test_exchange_bind_legacy_parameter(self):
321        self.obj._set_state(self.obj.OPEN)
322        callback_mock = mock.Mock()
323        with self.assertRaises(TypeError):
324            self.obj.exchange_bind(callback_mock,
325                                   'destination',
326                                   'source',
327                                   'routing_key',
328                                   True)
329
330    def test_exchange_delete_legacy_parameter(self):
331        self.obj._set_state(self.obj.OPEN)
332        callback_mock = mock.Mock()
333        with self.assertRaises(TypeError):
334            self.obj.exchange_delete(callback_mock,
335                                     'exchange',
336                                     True)
337
338    def test_exchange_unbind_legacy_parameter(self):
339        self.obj._set_state(self.obj.OPEN)
340        callback_mock = mock.Mock()
341        with self.assertRaises(TypeError):
342            self.obj.exchange_unbind(callback_mock,
343                                     'destination',
344                                     'source',
345                                     'routing_key',
346                                     True)
347
348    def test_flow_legacy_parameter(self):
349        self.obj._set_state(self.obj.OPEN)
350        callback_mock = mock.Mock()
351        with self.assertRaises(TypeError):
352            self.obj.flow(callback_mock, True)
353
354    def test_queue_delete_legacy_parameter(self):
355        self.obj._set_state(self.obj.OPEN)
356        callback_mock = mock.Mock()
357        with self.assertRaises(TypeError):
358            self.obj.queue_delete(callback_mock,
359                                  'queue',
360                                  True,
361                                  True)
362
363    def test_queue_unbind_legacy_parameter(self):
364        self.obj._set_state(self.obj.OPEN)
365        callback_mock = mock.Mock()
366        with self.assertRaises(TypeError):
367            self.obj.queue_unbind(callback_mock,
368                                  'queue',
369                                  'exchange',
370                                  'routing_key')
371
372    def test_queue_purge_legacy_parameter(self):
373        self.obj._set_state(self.obj.OPEN)
374        callback_mock = mock.Mock()
375        with self.assertRaises(TypeError):
376            self.obj.queue_purge(callback_mock,
377                                 'queue',
378                                 True)
379
380    def test_basic_consume_channel_closed(self):
381        mock_callback = mock.Mock()
382        mock_on_msg_callback = mock.Mock()
383        self.assertRaises(exceptions.ChannelWrongStateError,
384                          self.obj.basic_consume,
385                          'test-queue', mock_on_msg_callback,
386                          callback=mock_callback)
387
388    @mock.patch('pika.channel.Channel._raise_if_not_open')
389    def test_basic_consume_calls_raise_if_not_open(self, raise_if_not_open):
390        self.obj._set_state(self.obj.OPEN)
391        mock_callback = mock.Mock()
392        mock_on_msg_callback = mock.Mock()
393        self.obj.basic_consume('test-queue', mock_on_msg_callback, callback=mock_callback)
394        raise_if_not_open.assert_called_once_with()
395
396    def test_basic_consume_consumer_tag_no_completion_callback(self):
397        self.obj._set_state(self.obj.OPEN)
398        expectation = 'ctag1.'
399        mock_on_msg_callback = mock.Mock()
400        consumer_tag = self.obj.basic_consume('test-queue',
401                                              mock_on_msg_callback)[:6]
402        self.assertEqual(consumer_tag, expectation)
403
404    def test_basic_consume_consumer_tag_with_completion_callback(self):
405        self.obj._set_state(self.obj.OPEN)
406        expectation = 'ctag1.'
407        mock_callback = mock.Mock()
408        mock_on_msg_callback = mock.Mock()
409        consumer_tag = self.obj.basic_consume('test-queue',
410                                              mock_on_msg_callback,
411                                              callback=mock_callback)[:6]
412        self.assertEqual(consumer_tag, expectation)
413
414    def test_basic_consume_consumer_tag_cancelled_full(self):
415        self.obj._set_state(self.obj.OPEN)
416        expectation = 'ctag1.'
417        mock_on_msg_callback = mock.Mock()
418        for ctag in ['ctag1.%i' % ii for ii in range(11)]:
419            self.obj._cancelled.add(ctag)
420        self.assertEqual(
421            self.obj.basic_consume('test-queue', mock_on_msg_callback)[:6],
422            expectation)
423
424    def test_basic_consume_consumer_tag_in_consumers(self):
425        self.obj._set_state(self.obj.OPEN)
426        consumer_tag = 'ctag1.0'
427        mock_on_msg_callback = mock.Mock()
428        mock_callback = mock.Mock()
429        self.obj.basic_consume(
430            'test-queue', mock_on_msg_callback,
431            consumer_tag=consumer_tag, callback=mock_callback)
432        self.assertIn(consumer_tag, self.obj._consumers)
433
434    def test_basic_consume_duplicate_consumer_tag_raises(self):
435        self.obj._set_state(self.obj.OPEN)
436        consumer_tag = 'ctag1.0'
437        mock_on_msg_callback = mock.Mock()
438        mock_callback = mock.Mock()
439        self.obj._consumers[consumer_tag] = logging.debug
440        self.assertRaises(exceptions.DuplicateConsumerTag,
441                          self.obj.basic_consume, 'test-queue',
442                          mock_on_msg_callback, False, False,
443                          consumer_tag, None, mock_callback)
444
445    def test_basic_consume_consumers_callback_value(self):
446        self.obj._set_state(self.obj.OPEN)
447        consumer_tag = 'ctag1.0'
448        mock_on_msg_callback = mock.Mock()
449        self.obj.basic_consume(
450            'test-queue', mock_on_msg_callback, consumer_tag=consumer_tag)
451        self.assertEqual(self.obj._consumers[consumer_tag], mock_on_msg_callback)
452
453    @mock.patch('pika.spec.Basic.Consume')
454    @mock.patch('pika.channel.Channel._rpc')
455    def test_basic_consume_consumers_rpc_with_no_completion_callback(self, rpc, _unused):
456        self.obj._set_state(self.obj.OPEN)
457        consumer_tag = 'ctag1.0'
458        mock_on_msg_callback = mock.Mock()
459        self.obj.basic_consume(
460            'test-queue', mock_on_msg_callback,
461            consumer_tag=consumer_tag)
462        expectation = spec.Basic.Consume(
463            queue='test-queue',
464            consumer_tag=consumer_tag,
465            no_ack=False,
466            exclusive=False)
467        rpc.assert_called_once_with(expectation, self.obj._on_eventok,
468                                    [(spec.Basic.ConsumeOk, {
469                                        'consumer_tag': consumer_tag
470                                    })])
471
472    @mock.patch('pika.spec.Basic.Consume')
473    @mock.patch('pika.channel.Channel._rpc')
474    def test_basic_consume_consumers_rpc_with_completion_callback(self, rpc, _unused):
475        self.obj._set_state(self.obj.OPEN)
476        consumer_tag = 'ctag1.0'
477        mock_on_msg_callback = mock.Mock()
478        mock_callback = mock.Mock()
479        self.obj.basic_consume(
480            'test-queue', mock_on_msg_callback,
481            consumer_tag=consumer_tag, callback=mock_callback)
482        expectation = spec.Basic.Consume(
483            queue='test-queue',
484            consumer_tag=consumer_tag,
485            no_ack=False,
486            exclusive=False)
487        rpc.assert_called_once_with(expectation, mock_callback,
488                                    [(spec.Basic.ConsumeOk, {
489                                        'consumer_tag': consumer_tag
490                                    })])
491
492    def test_basic_get_requires_callback(self):
493        self.obj._set_state(self.obj.OPEN)
494        with self.assertRaises(TypeError):
495            self.obj.basic_get('test-queue', None)
496
497    @mock.patch('pika.channel.Channel._send_method')
498    def test_basic_get_callback(self, _unused):
499        self.obj._set_state(self.obj.OPEN)
500        mock_callback = mock.Mock()
501        self.obj.basic_get('test-queue', mock_callback)
502        self.assertEqual(self.obj._on_getok_callback, mock_callback)
503
504    @mock.patch('pika.spec.Basic.Get')
505    @mock.patch('pika.channel.Channel._send_method')
506    def test_basic_get_send_method_called(self, send_method, _unused):
507        self.obj._set_state(self.obj.OPEN)
508        mock_callback = mock.Mock()
509        self.obj.basic_get('test-queue', mock_callback)
510        send_method.assert_called_once_with(
511            spec.Basic.Get(queue='test-queue', no_ack=False))
512
513    @mock.patch('pika.spec.Basic.Get')
514    @mock.patch('pika.channel.Channel._send_method')
515    def test_basic_get_send_method_called_auto_ack(self, send_method, _unused):
516        self.obj._set_state(self.obj.OPEN)
517        mock_callback = mock.Mock()
518        self.obj.basic_get('test-queue', mock_callback, auto_ack=True)
519        send_method.assert_called_once_with(
520            spec.Basic.Get(queue='test-queue', no_ack=True))
521
522    def test_basic_nack_raises_channel_wrong_state(self):
523        self.assertRaises(exceptions.ChannelWrongStateError,
524                          self.obj.basic_nack, 0, False, True)
525
526    @mock.patch('pika.spec.Basic.Nack')
527    @mock.patch('pika.channel.Channel._send_method')
528    def test_basic_nack_send_method_request(self, send_method, _unused):
529        self.obj._set_state(self.obj.OPEN)
530        self.obj.basic_nack(1, False, True)
531        send_method.assert_called_once_with(spec.Basic.Nack(1, False, True))
532
533    def test_basic_publish_raises_channel_wrong_state(self):
534        self.assertRaises(exceptions.ChannelWrongStateError,
535                          self.obj.basic_publish, 'foo', 'bar', 'baz')
536
537    @mock.patch('pika.spec.Basic.Publish')
538    @mock.patch('pika.channel.Channel._send_method')
539    def test_basic_publish_send_method_request(self, send_method, _unused):
540        self.obj._set_state(self.obj.OPEN)
541        exchange = 'basic_publish_test'
542        routing_key = 'routing-key-fun'
543        body = b'This is my body'
544        properties = spec.BasicProperties(content_type='text/plain')
545        mandatory = False
546        self.obj.basic_publish(exchange, routing_key, body, properties,
547                               mandatory)
548        send_method.assert_called_once_with(
549            spec.Basic.Publish(
550                exchange=exchange,
551                routing_key=routing_key,
552                mandatory=mandatory), (properties, body))
553
554    def test_basic_qos_raises_channel_wrong_state(self):
555        self.assertRaises(exceptions.ChannelWrongStateError,
556                          self.obj.basic_qos, 0, False, True)
557
558    def test_basic_qos_invalid_prefetch_size_raises_error(self):
559        self.obj._set_state(self.obj.OPEN)
560        with self.assertRaises(ValueError) as ex:
561            self.obj.basic_qos('foo', 123)
562        self.assertEqual("invalid literal for int() with base 10: 'foo'",
563                         ex.exception.args[0])
564        with self.assertRaises(ValueError) as ex:
565            self.obj.basic_qos(-1, 123)
566        self.assertIn('prefetch_size', ex.exception.args[0])
567
568    def test_basic_qos_invalid_prefetch_count_raises_error(self):
569        self.obj._set_state(self.obj.OPEN)
570        with self.assertRaises(ValueError) as ex:
571            self.obj.basic_qos(123, 'foo')
572        self.assertEqual("invalid literal for int() with base 10: 'foo'",
573                         ex.exception.args[0])
574        with self.assertRaises(ValueError) as ex:
575            self.obj.basic_qos(123, -1)
576        self.assertIn('prefetch_count', ex.exception.args[0])
577
578    @mock.patch('pika.spec.Basic.Qos')
579    @mock.patch('pika.channel.Channel._rpc')
580    def test_basic_qos_rpc_request(self, rpc, _unused):
581        self.obj._set_state(self.obj.OPEN)
582        mock_callback = mock.Mock()
583        self.obj.basic_qos(10, 20, False, callback=mock_callback)
584        rpc.assert_called_once_with(
585            spec.Basic.Qos(10, 20, False), mock_callback, [spec.Basic.QosOk])
586
587    def test_basic_reject_raises_channel_wrong_state(self):
588        self.assertRaises(exceptions.ChannelWrongStateError,
589                          self.obj.basic_reject, 1, False)
590
591    @mock.patch('pika.spec.Basic.Reject')
592    @mock.patch('pika.channel.Channel._send_method')
593    def test_basic_reject_send_method_request_with_int_tag(
594            self, send_method, _unused):
595        self.obj._set_state(self.obj.OPEN)
596        self.obj.basic_reject(1, True)
597        send_method.assert_called_once_with(spec.Basic.Reject(1, True))
598
599    def test_basic_reject_spec_with_int_tag(self):
600        decoded = spec.Basic.Reject()
601        decoded.decode(b''.join(spec.Basic.Reject(1, True).encode()))
602
603        self.assertEqual(decoded.delivery_tag, 1)
604        self.assertIs(decoded.requeue, True)
605
606    @mock.patch('pika.spec.Basic.Reject')
607    @mock.patch('pika.channel.Channel._send_method')
608    def test_basic_reject_send_method_request_with_long_tag(
609            self, send_method, _unused):
610        self.obj._set_state(self.obj.OPEN)
611
612        # NOTE: we use `sys.maxsize` for compatibility with python 3, which
613        # doesn't have `sys.maxint`
614        self.obj.basic_reject(sys.maxsize, True)
615        send_method.assert_called_once_with(
616            spec.Basic.Reject(sys.maxsize, True))
617
618    def test_basic_reject_spec_with_long_tag(self):
619        # NOTE: we use `sys.maxsize` for compatibility with python 3, which
620        # doesn't have `sys.maxint`
621        decoded = spec.Basic.Reject()
622        decoded.decode(b''.join(spec.Basic.Reject(sys.maxsize, True).encode()))
623
624        self.assertEqual(decoded.delivery_tag, sys.maxsize)
625        self.assertIs(decoded.requeue, True)
626
627    def test_basic_recover_raises_channel_wrong_state(self):
628        self.assertRaises(exceptions.ChannelWrongStateError,
629                          self.obj.basic_qos, 0, False, True)
630
631    @mock.patch('pika.spec.Basic.Recover')
632    @mock.patch('pika.channel.Channel._rpc')
633    def test_basic_recover_rpc_request(self, rpc, _unused):
634        self.obj._set_state(self.obj.OPEN)
635        mock_callback = mock.Mock()
636        self.obj.basic_recover(True, callback=mock_callback)
637        rpc.assert_called_once_with(
638            spec.Basic.Recover(True), mock_callback, [spec.Basic.RecoverOk])
639
640    def test_close_in_closing_state_raises_channel_wrong_state(self):
641        self.obj._set_state(self.obj.CLOSING)
642        self.assertRaises(exceptions.ChannelWrongStateError, self.obj.close)
643        self.assertTrue(self.obj.is_closing)
644
645    def test_close_in_closed_state_raises_channel_wrong_state_and_stays_closed(
646            self):
647        self.assertTrue(self.obj.is_closed)
648        self.assertRaises(exceptions.ChannelWrongStateError, self.obj.close)
649        self.assertTrue(self.obj.is_closed)
650
651    @mock.patch('pika.spec.Channel.Close')
652    def test_close_in_opening_state(self, _unused):
653        self.obj._set_state(self.obj.OPENING)
654        self.obj._rpc = mock.Mock(wraps=self.obj._rpc)
655        self.obj._cleanup = mock.Mock(wraps=self.obj._cleanup)
656
657        # close() called by user
658        self.obj.close(200, 'Got to go')
659
660        self.obj._rpc.assert_called_once_with(
661            spec.Channel.Close(200, 'Got to go', 0, 0), self.obj._on_closeok,
662            [spec.Channel.CloseOk])
663
664        self.assertEqual(self.obj._closing_reason.reply_code, 200)
665        self.assertEqual(self.obj._closing_reason.reply_text, 'Got to go')
666        self.assertEqual(self.obj._state, self.obj.CLOSING)
667
668        # OpenOk method from broker
669        self.obj._on_openok(
670            frame.Method(self.obj.channel_number,
671                         spec.Channel.OpenOk(self.obj.channel_number)))
672        self.assertEqual(self.obj._state, self.obj.CLOSING)
673        self.assertEqual(self.obj.callbacks.process.call_count, 0)
674
675        # CloseOk method from broker
676        self.obj._on_closeok(
677            frame.Method(self.obj.channel_number, spec.Channel.CloseOk()))
678        self.assertEqual(self.obj._state, self.obj.CLOSED)
679
680        self.obj.callbacks.process.assert_any_call(self.obj.channel_number,
681                                                   '_on_channel_close',
682                                                   self.obj, self.obj,
683                                                   mock.ANY)
684
685        self.assertEqual(self.obj._cleanup.call_count, 1)
686
687    def test_close_in_open_state_transitions_to_closing(self):
688        self.obj._set_state(self.obj.OPEN)
689        self.obj.close()
690        self.assertEqual(self.obj._state, channel.Channel.CLOSING)
691
692    def test_close_basic_cancel_called(self):
693        self.obj._set_state(self.obj.OPEN)
694        self.obj._consumers['abc'] = None
695        with mock.patch.object(self.obj, 'basic_cancel') as basic_cancel:
696            self.obj.close()
697            # this is actually not necessary but Pika currently cancels
698            # every consumer before closing the channel
699            basic_cancel.assert_called_once_with(consumer_tag='abc')
700
701    def test_confirm_delivery_with_bad_callback_raises_value_error(self):
702        self.assertRaises(ValueError,
703                          self.obj.confirm_delivery,
704                          'bad-callback')
705
706    def test_confirm_delivery_raises_channel_wrong_state(self):
707        cb = mock.Mock()
708        self.assertRaises(exceptions.ChannelWrongStateError,
709                          self.obj.confirm_delivery, cb)
710
711    def test_confirm_delivery_raises_method_not_implemented_for_confirms(self):
712        self.obj._set_state(self.obj.OPEN)
713        # Since connection is a mock.Mock, overwrite the method def with False
714        self.obj.connection.publisher_confirms = False
715        self.assertRaises(exceptions.MethodNotImplemented,
716                          self.obj.confirm_delivery, logging.debug)
717
718    def test_confirm_delivery_raises_method_not_implemented_for_nack(self):
719        self.obj._set_state(self.obj.OPEN)
720        # Since connection is a mock.Mock, overwrite the method def with False
721        self.obj.connection.basic_nack = False
722        self.assertRaises(exceptions.MethodNotImplemented,
723                          self.obj.confirm_delivery, logging.debug)
724
725    def test_confirm_delivery_async(self):
726        self.obj._set_state(self.obj.OPEN)
727        user_ack_nack_callback = mock.Mock()
728        self.obj.confirm_delivery(ack_nack_callback=user_ack_nack_callback)
729
730        self.assertEqual(self.obj.callbacks.add.call_count, 2)
731        self.obj.callbacks.add.assert_any_call(self.obj.channel_number,
732                                               spec.Basic.Ack,
733                                               user_ack_nack_callback, False)
734        self.obj.callbacks.add.assert_any_call(self.obj.channel_number,
735                                               spec.Basic.Nack,
736                                               user_ack_nack_callback, False)
737
738    def test_confirm_delivery_callback_without_nowait_selectok(self):
739        self.obj._set_state(self.obj.OPEN)
740        expectation = [
741            self.obj.channel_number,
742            spec.Confirm.SelectOk,
743            self.obj._on_selectok
744        ]
745        self.obj.confirm_delivery(ack_nack_callback=logging.debug,
746                                  callback=self.obj._on_selectok)
747        self.obj.callbacks.add.assert_called_with(*expectation, arguments=None)
748
749    def test_confirm_delivery_callback_basic_ack(self):
750        self.obj._set_state(self.obj.OPEN)
751        expectation = (self.obj.channel_number, spec.Basic.Ack, logging.debug,
752                       False)
753        self.obj.confirm_delivery(ack_nack_callback=logging.debug)
754        self.obj.callbacks.add.assert_any_call(*expectation)
755
756    def test_confirm_delivery_callback_basic_nack(self):
757        self.obj._set_state(self.obj.OPEN)
758        expectation = (self.obj.channel_number, spec.Basic.Nack, logging.debug,
759                       False)
760        self.obj.confirm_delivery(ack_nack_callback=logging.debug)
761        self.obj.callbacks.add.assert_any_call(*expectation)
762
763    def test_confirm_delivery_no_callback_callback_call_count(self):
764        self.obj._set_state(self.obj.OPEN)
765        user_ack_nack_callback = mock.Mock()
766        self.obj.confirm_delivery(ack_nack_callback=user_ack_nack_callback,
767                                  callback=self.obj._on_selectok)
768        expectation = [
769            mock.call(
770                *[
771                    self.obj.channel_number,
772                    spec.Basic.Ack,
773                    user_ack_nack_callback,
774                    False
775                ]),
776            mock.call(
777                *[
778                    self.obj.channel_number,
779                    spec.Basic.Nack,
780                    user_ack_nack_callback,
781                    False
782                ]),
783            mock.call(
784                *[
785                    self.obj.channel_number,
786                    spec.Confirm.SelectOk,
787                    self.obj._on_synchronous_complete
788                ],
789                arguments=None),
790            mock.call(
791                *[
792                    self.obj.channel_number,
793                    spec.Confirm.SelectOk,
794                    self.obj._on_selectok,
795                ],
796                arguments=None)
797        ]
798        self.assertEqual(self.obj.callbacks.add.call_args_list, expectation)
799
800    def test_confirm_delivery_callback_yes_basic_ack_callback(self):
801        self.obj._set_state(self.obj.OPEN)
802        user_callback = mock.Mock()
803        expectation = [self.obj.channel_number, spec.Basic.Ack, user_callback, False]
804        expectation_item = mock.call(*expectation)
805        self.obj.confirm_delivery(ack_nack_callback=user_callback)
806        self.assertIn(expectation_item, self.obj.callbacks.add.call_args_list)
807
808    def test_confirm_delivery_callback_yes_basic_nack_callback(self):
809        self.obj._set_state(self.obj.OPEN)
810        user_callback = mock.Mock()
811        expectation = [self.obj.channel_number, spec.Basic.Nack, user_callback, False]
812        expectation_item = mock.call(*expectation)
813        self.obj.confirm_delivery(ack_nack_callback=user_callback)
814        self.assertIn(expectation_item, self.obj.callbacks.add.call_args_list)
815
816    def test_consumer_tags(self):
817        self.assertListEqual(self.obj.consumer_tags,
818                             list(self.obj._consumers.keys()))
819
820    def test_exchange_bind_raises_channel_wrong_state(self):
821        self.assertRaises(exceptions.ChannelWrongStateError,
822                          self.obj.exchange_bind,
823                          'foo', 'bar', 'baz', None, None)
824
825    def test_exchange_bind_raises_value_error_on_invalid_callback(self):
826        self.obj._set_state(self.obj.OPEN)
827        self.assertRaises(TypeError, self.obj.exchange_bind,
828                          'foo', 'bar', 'baz', None, 'callback')
829
830    @mock.patch('pika.spec.Exchange.Bind')
831    @mock.patch('pika.channel.Channel._rpc')
832    def test_exchange_bind_rpc_request(self, rpc, _unused):
833        self.obj._set_state(self.obj.OPEN)
834        mock_callback = mock.Mock()
835        self.obj.exchange_bind('foo', 'bar', 'baz', callback=mock_callback)
836        rpc.assert_called_once_with(
837            spec.Exchange.Bind(0, 'foo', 'bar', 'baz'), mock_callback,
838            [spec.Exchange.BindOk])
839
840    @mock.patch('pika.spec.Exchange.Bind')
841    @mock.patch('pika.channel.Channel._rpc')
842    def test_exchange_bind_rpc_request_nowait(self, rpc, _unused):
843        self.obj._set_state(self.obj.OPEN)
844        self.obj.exchange_bind('foo', 'bar', 'baz', callback=None)
845        rpc.assert_called_once_with(
846            spec.Exchange.Bind(0, 'foo', 'bar', 'baz'), None, [])
847
848    def test_exchange_declare_raises_channel_wrong_state(self):
849        self.assertRaises(
850            exceptions.ChannelWrongStateError,
851            self.obj.exchange_declare,
852            exchange='foo')
853
854    def test_exchange_declare_raises_value_error_on_invalid_callback(self):
855        self.obj._set_state(self.obj.OPEN)
856        self.assertRaises(TypeError, self.obj.exchange_declare,
857                          'foo', callback='callback')
858
859    @mock.patch('pika.spec.Exchange.Declare')
860    @mock.patch('pika.channel.Channel._rpc')
861    def test_exchange_declare_rpc_request(self, rpc, _unused):
862        self.obj._set_state(self.obj.OPEN)
863        mock_callback = mock.Mock()
864        self.obj.exchange_declare('foo', callback=mock_callback)
865        rpc.assert_called_once_with(
866            spec.Exchange.Declare(0, 'foo'), mock_callback,
867            [spec.Exchange.DeclareOk])
868
869    @mock.patch('pika.spec.Exchange.Declare')
870    @mock.patch('pika.channel.Channel._rpc')
871    def test_exchange_declare_rpc_request_nowait(self, rpc, _unused):
872        self.obj._set_state(self.obj.OPEN)
873        self.obj.exchange_declare('foo', callback=None)
874        rpc.assert_called_once_with(
875            spec.Exchange.Declare(0, 'foo'), None, [])
876
877    def test_exchange_delete_raises_channel_wrong_state(self):
878        self.assertRaises(
879            exceptions.ChannelWrongStateError,
880            self.obj.exchange_delete, exchange='foo')
881
882    def test_exchange_delete_raises_value_error_on_invalid_callback(self):
883        self.obj._set_state(self.obj.OPEN)
884        self.assertRaises(TypeError, self.obj.exchange_delete,
885                          'foo', callback='callback')
886
887    @mock.patch('pika.spec.Exchange.Delete')
888    @mock.patch('pika.channel.Channel._rpc')
889    def test_exchange_delete_rpc_request(self, rpc, _unused):
890        self.obj._set_state(self.obj.OPEN)
891        mock_callback = mock.Mock()
892        self.obj.exchange_delete('foo', callback=mock_callback)
893        rpc.assert_called_once_with(
894            spec.Exchange.Delete(0, 'foo'), mock_callback,
895            [spec.Exchange.DeleteOk])
896
897    @mock.patch('pika.spec.Exchange.Delete')
898    @mock.patch('pika.channel.Channel._rpc')
899    def test_exchange_delete_rpc_request_nowait(self, rpc, _unused):
900        self.obj._set_state(self.obj.OPEN)
901        self.obj.exchange_delete('foo', callback=None)
902        rpc.assert_called_once_with(
903            spec.Exchange.Delete(0, 'foo'), None, [])
904
905    def test_exchange_unbind_raises_channel_wrong_state(self):
906        self.assertRaises(exceptions.ChannelWrongStateError,
907                          self.obj.exchange_unbind,
908                          None, 'foo', 'bar', 'baz')
909
910    def test_exchange_unbind_raises_value_error_on_invalid_callback(self):
911        self.obj._set_state(self.obj.OPEN)
912        self.assertRaises(TypeError, self.obj.exchange_unbind,
913                          'foo', 'bar', 'baz', callback='callback')
914
915    @mock.patch('pika.spec.Exchange.Unbind')
916    @mock.patch('pika.channel.Channel._rpc')
917    def test_exchange_unbind_rpc_request(self, rpc, _unused):
918        self.obj._set_state(self.obj.OPEN)
919        mock_callback = mock.Mock()
920        self.obj.exchange_unbind('foo', 'bar', 'baz', callback=mock_callback)
921        rpc.assert_called_once_with(
922            spec.Exchange.Unbind(0, 'foo', 'bar', 'baz'), mock_callback,
923            [spec.Exchange.UnbindOk])
924
925    @mock.patch('pika.spec.Exchange.Unbind')
926    @mock.patch('pika.channel.Channel._rpc')
927    def test_exchange_unbind_rpc_request_nowait(self, rpc, _unused):
928        self.obj._set_state(self.obj.OPEN)
929        mock_callback = mock.Mock()
930        self.obj.exchange_unbind(
931            mock_callback, 'foo', 'bar', 'baz', callback=None)
932        rpc.assert_called_once_with(
933            spec.Exchange.Unbind(0, 'foo', 'bar', 'baz'), None, [])
934
935    def test_flow_raises_channel_wrong_state(self):
936        self.assertRaises(exceptions.ChannelWrongStateError,
937                          self.obj.flow, True, 'foo')
938
939    def test_flow_raises_invalid_callback(self):
940        self.obj._set_state(self.obj.OPEN)
941        self.assertRaises(TypeError, self.obj.flow, True, 'foo')
942
943    @mock.patch('pika.spec.Channel.Flow')
944    @mock.patch('pika.channel.Channel._rpc')
945    def test_flow_on_rpc_request(self, rpc, _unused):
946        self.obj._set_state(self.obj.OPEN)
947        mock_callback = mock.Mock()
948        self.obj.flow(True, callback=mock_callback)
949        rpc.assert_called_once_with(
950            spec.Channel.Flow(True), self.obj._on_flowok,
951            [spec.Channel.FlowOk])
952
953    @mock.patch('pika.spec.Channel.Flow')
954    @mock.patch('pika.channel.Channel._rpc')
955    def test_flow_off_rpc_request(self, rpc, _unused):
956        self.obj._set_state(self.obj.OPEN)
957        mock_callback = mock.Mock()
958        self.obj.flow(False, callback=mock_callback)
959        rpc.assert_called_once_with(
960            spec.Channel.Flow(False), self.obj._on_flowok,
961            [spec.Channel.FlowOk])
962
963    @mock.patch('pika.channel.Channel._rpc')
964    def test_flow_on_flowok_callback(self, _rpc):
965        self.obj._set_state(self.obj.OPEN)
966        mock_callback = mock.Mock()
967        self.obj.flow(True, callback=mock_callback)
968        self.assertEqual(self.obj._on_flowok_callback, mock_callback)
969
970    def test_is_closed_true(self):
971        self.obj._set_state(self.obj.CLOSED)
972        self.assertTrue(self.obj.is_closed)
973
974    def test_is_closed_false(self):
975        self.obj._set_state(self.obj.OPEN)
976        self.assertFalse(self.obj.is_closed)
977
978    def test_is_closing_true(self):
979        self.obj._set_state(self.obj.CLOSING)
980        self.assertTrue(self.obj.is_closing)
981
982    def test_is_closing_false(self):
983        self.obj._set_state(self.obj.OPEN)
984        self.assertFalse(self.obj.is_closing)
985
986    @mock.patch('pika.channel.Channel._rpc')
987    def test_channel_open_add_callbacks_called(self, _rpc):
988        with mock.patch.object(self.obj, '_add_callbacks') as _add_callbacks:
989            self.obj.open()
990            _add_callbacks.assert_called_once_with()
991
992    def test_queue_bind_raises_channel_wrong_state(self):
993        self.assertRaises(exceptions.ChannelWrongStateError,
994                          self.obj.queue_bind, '',
995                          'foo', 'bar', 'baz')
996
997    def test_queue_bind_raises_value_error_on_invalid_callback(self):
998        self.obj._set_state(self.obj.OPEN)
999        self.assertRaises(TypeError, self.obj.queue_bind,
1000                          'foo', 'bar', 'baz', callback='callback')
1001
1002    @mock.patch('pika.spec.Queue.Bind')
1003    @mock.patch('pika.channel.Channel._rpc')
1004    def test_queue_bind_rpc_request(self, rpc, _unused):
1005        self.obj._set_state(self.obj.OPEN)
1006        mock_callback = mock.Mock()
1007        self.obj.queue_bind('foo', 'bar', 'baz', callback=mock_callback)
1008        rpc.assert_called_once_with(
1009            spec.Queue.Bind(0, 'foo', 'bar', 'baz'), mock_callback,
1010            [spec.Queue.BindOk])
1011
1012    @mock.patch('pika.spec.Queue.Bind')
1013    @mock.patch('pika.channel.Channel._rpc')
1014    def test_queue_bind_rpc_request_nowait(self, rpc, _unused):
1015        self.obj._set_state(self.obj.OPEN)
1016        self.obj.queue_bind('foo', 'bar', 'baz', callback=None)
1017        rpc.assert_called_once_with(
1018            spec.Queue.Bind(0, 'foo', 'bar', 'baz'), None, [])
1019
1020    def test_queue_declare_raises_channel_wrong_state(self):
1021        self.assertRaises(
1022            exceptions.ChannelWrongStateError,
1023            self.obj.queue_declare,
1024            queue='foo',
1025            callback=None)
1026
1027    def test_queue_declare_raises_value_error_on_invalid_callback(self):
1028        self.obj._set_state(self.obj.OPEN)
1029        self.assertRaises(TypeError, self.obj.queue_declare,
1030                          'foo', callback='callback')
1031
1032    @mock.patch('pika.spec.Queue.Declare')
1033    @mock.patch('pika.channel.Channel._rpc')
1034    def test_queue_declare_rpc_request(self, rpc, _unused):
1035        self.obj._set_state(self.obj.OPEN)
1036        mock_callback = mock.Mock()
1037        self.obj.queue_declare('foo', callback=mock_callback)
1038        rpc.assert_called_once_with(
1039            spec.Queue.Declare(0, 'foo'), mock_callback,
1040            [(spec.Queue.DeclareOk, {
1041                'queue': 'foo'
1042            })])
1043
1044    @mock.patch('pika.spec.Queue.Declare')
1045    @mock.patch('pika.channel.Channel._rpc')
1046    def test_queue_declare_rpc_request_nowait(self, rpc, _unused):
1047        self.obj._set_state(self.obj.OPEN)
1048        self.obj.queue_declare('foo', callback=None)
1049        rpc.assert_called_once_with(
1050            spec.Queue.Declare(0, 'foo'), None, [])
1051
1052    def test_queue_delete_raises_channel_wrong_state(self):
1053        self.assertRaises(
1054            exceptions.ChannelWrongStateError,
1055            self.obj.queue_delete, queue='foo')
1056
1057    def test_queue_delete_raises_value_error_on_invalid_callback(self):
1058        self.obj._set_state(self.obj.OPEN)
1059        self.assertRaises(TypeError, self.obj.queue_delete,
1060                          'foo', callback='callback')
1061
1062    @mock.patch('pika.spec.Queue.Delete')
1063    @mock.patch('pika.channel.Channel._rpc')
1064    def test_queue_delete_rpc_request(self, rpc, _unused):
1065        self.obj._set_state(self.obj.OPEN)
1066        mock_callback = mock.Mock()
1067        self.obj.queue_delete('foo', callback=mock_callback)
1068        rpc.assert_called_once_with(
1069            spec.Queue.Delete(0, 'foo'), mock_callback, [spec.Queue.DeleteOk])
1070
1071    @mock.patch('pika.spec.Queue.Delete')
1072    @mock.patch('pika.channel.Channel._rpc')
1073    def test_queue_delete_rpc_request_nowait(self, rpc, _unused):
1074        self.obj._set_state(self.obj.OPEN)
1075        self.obj.queue_delete('foo', callback=None)
1076        rpc.assert_called_once_with(
1077            spec.Queue.Delete(0, 'foo'), None, [])
1078
1079    def test_queue_purge_raises_channel_wrong_state(self):
1080        self.assertRaises(
1081            exceptions.ChannelWrongStateError,
1082            self.obj.queue_purge, queue='foo')
1083
1084    def test_queue_purge_raises_value_error_on_invalid_callback(self):
1085        self.obj._set_state(self.obj.OPEN)
1086        self.assertRaises(TypeError, self.obj.queue_purge,
1087                          'foo', callback='callback')
1088
1089    @mock.patch('pika.spec.Queue.Purge')
1090    @mock.patch('pika.channel.Channel._rpc')
1091    def test_queue_purge_rpc_request(self, rpc, _unused):
1092        self.obj._set_state(self.obj.OPEN)
1093        mock_callback = mock.Mock()
1094        self.obj.queue_purge('foo', callback=mock_callback)
1095        rpc.assert_called_once_with(
1096            spec.Queue.Purge(0, 'foo'), mock_callback, [spec.Queue.PurgeOk])
1097
1098    @mock.patch('pika.spec.Queue.Purge')
1099    @mock.patch('pika.channel.Channel._rpc')
1100    def test_queue_purge_rpc_request_nowait(self, rpc, _unused):
1101        self.obj._set_state(self.obj.OPEN)
1102        self.obj.queue_purge('foo', callback=None)
1103        rpc.assert_called_once_with(
1104            spec.Queue.Purge(0, 'foo'), None, [])
1105
1106    def test_queue_unbind_raises_channel_wrong_state(self):
1107        self.assertRaises(exceptions.ChannelWrongStateError,
1108                          self.obj.queue_unbind,
1109                          'foo', 'bar', 'baz', callback=None)
1110
1111    def test_queue_unbind_raises_value_error_on_invalid_callback(self):
1112        self.obj._set_state(self.obj.OPEN)
1113        self.assertRaises(TypeError, self.obj.queue_unbind, 'foo',
1114                          'bar', 'baz', callback='callback')
1115
1116    @mock.patch('pika.spec.Queue.Unbind')
1117    @mock.patch('pika.channel.Channel._rpc')
1118    def test_queue_unbind_rpc_request(self, rpc, _unused):
1119        self.obj._set_state(self.obj.OPEN)
1120        mock_callback = mock.Mock()
1121        self.obj.queue_unbind('foo', 'bar', 'baz', callback=mock_callback)
1122        rpc.assert_called_once_with(
1123            spec.Queue.Unbind(0, 'foo', 'bar', 'baz'), mock_callback,
1124            [spec.Queue.UnbindOk])
1125
1126    def test_tx_commit_raises_channel_wrong_state(self):
1127        self.assertRaises(exceptions.ChannelWrongStateError,
1128                          self.obj.tx_commit, None)
1129
1130    @mock.patch('pika.spec.Tx.Commit')
1131    @mock.patch('pika.channel.Channel._rpc')
1132    def test_tx_commit_rpc_request(self, rpc, _unused):
1133        self.obj._set_state(self.obj.OPEN)
1134        mock_callback = mock.Mock()
1135        self.obj.tx_commit(callback=mock_callback)
1136        rpc.assert_called_once_with(spec.Tx.Commit(), mock_callback,
1137                                    [spec.Tx.CommitOk])
1138
1139    @mock.patch('pika.spec.Tx.Rollback')
1140    @mock.patch('pika.channel.Channel._rpc')
1141    def test_tx_rollback_rpc_request(self, rpc, _unused):
1142        self.obj._set_state(self.obj.OPEN)
1143        mock_callback = mock.Mock()
1144        self.obj.tx_rollback(callback=mock_callback)
1145        rpc.assert_called_once_with(spec.Tx.Rollback(), mock_callback,
1146                                    [spec.Tx.RollbackOk])
1147
1148    @mock.patch('pika.spec.Tx.Select')
1149    @mock.patch('pika.channel.Channel._rpc')
1150    def test_tx_select_rpc_request(self, rpc, _unused):
1151        self.obj._set_state(self.obj.OPEN)
1152        mock_callback = mock.Mock()
1153        self.obj.tx_select(callback=mock_callback)
1154        rpc.assert_called_once_with(spec.Tx.Select(), mock_callback,
1155                                    [spec.Tx.SelectOk])
1156
1157    # Test internal methods
1158
1159    def test_add_callbacks_basic_cancel_empty_added(self):
1160        self.obj._add_callbacks()
1161        self.obj.callbacks.add.assert_any_call(self.obj.channel_number,
1162                                               spec.Basic.Cancel,
1163                                               self.obj._on_cancel, False)
1164
1165    def test_add_callbacks_basic_get_empty_added(self):
1166        self.obj._add_callbacks()
1167        self.obj.callbacks.add.assert_any_call(self.obj.channel_number,
1168                                               spec.Basic.GetEmpty,
1169                                               self.obj._on_getempty, False)
1170
1171    def test_add_callbacks_channel_close_added(self):
1172        self.obj._add_callbacks()
1173        self.obj.callbacks.add.assert_any_call(self.obj.channel_number,
1174                                               spec.Channel.Close,
1175                                               self.obj._on_close_from_broker,
1176                                               True)
1177
1178    def test_add_callbacks_channel_flow_added(self):
1179        self.obj._add_callbacks()
1180        self.obj.callbacks.add.assert_any_call(self.obj.channel_number,
1181                                               spec.Channel.Flow,
1182                                               self.obj._on_flow, False)
1183
1184    def test_cleanup(self):
1185        self.obj._cleanup()
1186        self.obj.callbacks.cleanup.assert_called_once_with(
1187            str(self.obj.channel_number))
1188
1189    def test_handle_content_frame_method_returns_none(self):
1190        frame_value = frame.Method(1, spec.Basic.Deliver('ctag0', 1))
1191        self.assertEqual(self.obj._handle_content_frame(frame_value), None)
1192
1193    def test_handle_content_frame_sets_method_frame(self):
1194        frame_value = frame.Method(1, spec.Basic.Deliver('ctag0', 1))
1195        self.obj._handle_content_frame(frame_value)
1196        self.assertEqual(self.obj._content_assembler._method_frame,
1197                         frame_value)
1198
1199    def test_handle_content_frame_sets_header_frame(self):
1200        frame_value = frame.Header(1, 10, spec.BasicProperties())
1201        self.obj._handle_content_frame(frame_value)
1202        self.assertEqual(self.obj._content_assembler._header_frame,
1203                         frame_value)
1204
1205    def test_handle_content_frame_basic_deliver_called(self):
1206        method_value = frame.Method(1, spec.Basic.Deliver('ctag0', 1))
1207        self.obj._handle_content_frame(method_value)
1208        header_value = frame.Header(1, 10, spec.BasicProperties())
1209        self.obj._handle_content_frame(header_value)
1210        body_value = frame.Body(1, b'0123456789')
1211        with mock.patch.object(self.obj, '_on_deliver') as deliver:
1212            self.obj._handle_content_frame(body_value)
1213            deliver.assert_called_once_with(method_value, header_value,
1214                                            b'0123456789')
1215
1216    def test_handle_content_frame_basic_get_called(self):
1217        method_value = frame.Method(1, spec.Basic.GetOk('ctag0', 1))
1218        self.obj._handle_content_frame(method_value)
1219        header_value = frame.Header(1, 10, spec.BasicProperties())
1220        self.obj._handle_content_frame(header_value)
1221        body_value = frame.Body(1, b'0123456789')
1222        with mock.patch.object(self.obj, '_on_getok') as getok:
1223            self.obj._handle_content_frame(body_value)
1224            getok.assert_called_once_with(method_value, header_value,
1225                                          b'0123456789')
1226
1227    def test_handle_content_frame_basic_return_called(self):
1228        method_value = frame.Method(1,
1229                                    spec.Basic.Return(999, 'Reply Text',
1230                                                      'exchange_value',
1231                                                      'routing.key'))
1232        self.obj._handle_content_frame(method_value)
1233        header_value = frame.Header(1, 10, spec.BasicProperties())
1234        self.obj._handle_content_frame(header_value)
1235        body_value = frame.Body(1, b'0123456789')
1236        with mock.patch.object(self.obj, '_on_return') as basic_return:
1237            self.obj._handle_content_frame(body_value)
1238            basic_return.assert_called_once_with(method_value, header_value,
1239                                                 b'0123456789')
1240
1241    def test_has_content_true(self):
1242        self.assertTrue(spec.has_content(spec.Basic.GetOk.INDEX))
1243
1244    def test_has_content_false(self):
1245        self.assertFalse(spec.has_content(spec.Basic.Ack.INDEX))
1246
1247    def test_on_cancel_not_appended_cancelled(self):
1248        consumer_tag = 'ctag0'
1249        frame_value = frame.Method(1, spec.Basic.Cancel(consumer_tag))
1250        self.obj._on_cancel(frame_value)
1251        self.assertNotIn(consumer_tag, self.obj._cancelled)
1252
1253    def test_on_cancel_removed_consumer(self):
1254        consumer_tag = 'ctag0'
1255        self.obj._consumers[consumer_tag] = logging.debug
1256        frame_value = frame.Method(1, spec.Basic.Cancel(consumer_tag))
1257        self.obj._on_cancel(frame_value)
1258        self.assertNotIn(consumer_tag, self.obj._consumers)
1259
1260    def test_on_cancelok_removed_consumer(self):
1261        consumer_tag = 'ctag0'
1262        self.obj._consumers[consumer_tag] = logging.debug
1263        frame_value = frame.Method(1, spec.Basic.CancelOk(consumer_tag))
1264        self.obj._on_cancelok(frame_value)
1265        self.assertNotIn(consumer_tag, self.obj._consumers)
1266
1267    @mock.patch('pika.spec.Channel.CloseOk')
1268    def test_on_close_from_broker_in_open_state(self, _unused):
1269        self.obj._set_state(self.obj.OPEN)
1270        self.obj._send_method = mock.Mock(wraps=self.obj._send_method)
1271        self.obj._cleanup = mock.Mock(wraps=self.obj._cleanup)
1272
1273        method_frame = frame.Method(self.obj.channel_number,
1274                                    spec.Channel.Close(400, 'error'))
1275        self.obj._on_close_from_broker(method_frame)
1276
1277        self.assertTrue(self.obj.is_closed,
1278                        'Channel was not closed; state=%s' %
1279                        (self.obj._state, ))
1280
1281        self.obj._send_method.assert_called_once_with(spec.Channel.CloseOk())
1282
1283        self.obj.callbacks.process.assert_any_call(
1284            self.obj.channel_number, '_on_channel_close', self.obj, self.obj,
1285            mock.ANY)
1286
1287        reason = self.obj.callbacks.process.call_args_list[0][0][4]
1288        self.assertIsInstance(reason, exceptions.ChannelClosedByBroker)
1289        self.assertEqual((reason.reply_code, reason.reply_text),
1290                         (400, 'error'))
1291
1292        self.assertEqual(self.obj._cleanup.call_count, 1)
1293
1294    def test_on_close_from_broker_in_closing_state(self):
1295        self.obj._set_state(self.obj.CLOSING)
1296        self.obj._cleanup = mock.Mock(wraps=self.obj._cleanup)
1297
1298        method_frame = frame.Method(self.obj.channel_number,
1299                                    spec.Channel.Close(400, 'error'))
1300        self.obj._on_close_from_broker(method_frame)
1301
1302        # Verify didn't alter state (will wait for CloseOk)
1303        self.assertTrue(self.obj.is_closing,
1304                        'Channel was not closed; state=%s' %
1305                        (self.obj._state, ))
1306
1307        self.assertIsInstance(self.obj._closing_reason,
1308                              exceptions.ChannelClosedByBroker)
1309        self.assertEqual(self.obj._closing_reason.reply_code, 400)
1310        self.assertEqual(self.obj._closing_reason.reply_text, 'error')
1311
1312        self.assertFalse(self.obj.callbacks.process.called,
1313                         self.obj.callbacks.process.call_args_list)
1314
1315        self.assertFalse(self.obj._cleanup.called)
1316
1317    @mock.patch('logging.Logger.warning')
1318    def test_on_close_from_broker_warning(self, warning):
1319        self.obj._state = channel.Channel.OPEN
1320        method_frame = frame.Method(self.obj.channel_number,
1321                                    spec.Channel.Close(999, 'Test_Value'))
1322        self.obj._on_close_from_broker(method_frame)
1323        warning.assert_called_once_with(
1324            'Received remote Channel.Close (%s): %r on %s',
1325            method_frame.method.reply_code, method_frame.method.reply_text,
1326            self.obj)
1327
1328        self.assertIsInstance(self.obj._closing_reason,
1329                              exceptions.ChannelClosedByBroker)
1330
1331    def _verify_on_close_meta_transitions_to_closed(self, initial_state):
1332        self.obj._set_state(initial_state)
1333        self.obj._cleanup = mock.Mock(wraps=self.obj._cleanup)
1334
1335        reason = Exception('Oops')
1336        self.obj._on_close_meta(reason)
1337
1338        self.assertTrue(self.obj.is_closed)
1339
1340        self.assertEqual(self.obj._cleanup.call_count, 1)
1341
1342        self.assertEqual(self.obj.callbacks.process.call_count, 2)
1343
1344        self.obj.callbacks.process.assert_any_call(
1345            self.obj.channel_number, '_on_channel_close', self.obj, self.obj,
1346            reason)
1347
1348        self.obj.callbacks.process.assert_any_call(
1349            self.obj.channel_number, self.obj._ON_CHANNEL_CLEANUP_CB_KEY,
1350            self.obj, self.obj)
1351
1352    def test_on_close_meta_in_opening_state_transitions_to_closed(self):
1353        self._verify_on_close_meta_transitions_to_closed(self.obj.OPENING)
1354
1355    def test_on_close_meta_in_open_state_transitions_to_closed(self):
1356        self._verify_on_close_meta_transitions_to_closed(self.obj.OPEN)
1357
1358    def test_on_close_meta_in_closing_state_transitions_to_closed(self):
1359        self._verify_on_close_meta_transitions_to_closed(self.obj.CLOSING)
1360
1361    def test_on_close_meta_in_closed_state_is_suppressed(self):
1362        self.obj._cleanup = mock.Mock(wraps=self.obj._cleanup)
1363        self.obj._set_state(self.obj.CLOSED)
1364
1365        self.obj._on_close_meta(Exception('Internal error'))
1366
1367        self.assertTrue(self.obj.is_closed)
1368        self.assertEqual(self.obj.callbacks.process.call_count, 0)
1369        self.assertEqual(self.obj._cleanup.call_count, 0)
1370
1371    def test_on_deliver_callback_called(self):
1372        self.obj._set_state(self.obj.OPEN)
1373        consumer_tag = 'ctag0'
1374        mock_callback = mock.Mock()
1375        self.obj._consumers[consumer_tag] = mock_callback
1376        method_value = frame.Method(1, spec.Basic.Deliver(consumer_tag, 1))
1377        header_value = frame.Header(1, 10, spec.BasicProperties())
1378        body_value = b'0123456789'
1379        self.obj._on_deliver(method_value, header_value, body_value)
1380        mock_callback.assert_called_with(self.obj, method_value.method,
1381                                         header_value.properties, body_value)
1382
1383    def test_on_closeok(self):
1384        self.obj._set_state(self.obj.OPEN)
1385        self.obj._cleanup = mock.Mock(wraps=self.obj._cleanup)
1386
1387        # Close from user
1388        self.obj.close(200, 'All is well')
1389        self.assertEqual(self.obj._closing_reason.reply_code, 200)
1390        self.assertEqual(self.obj._closing_reason.reply_text, 'All is well')
1391        self.assertEqual(self.obj._state, self.obj.CLOSING)
1392
1393        self.obj._on_closeok(
1394            frame.Method(self.obj.channel_number, spec.Channel.CloseOk()))
1395
1396        self.assertTrue(self.obj.is_closed,
1397                        'Channel was not closed; state=%s' %
1398                        (self.obj._state, ))
1399
1400        self.obj.callbacks.process.assert_any_call(self.obj.channel_number,
1401                                                   '_on_channel_close',
1402                                                   self.obj, self.obj,
1403                                                   mock.ANY)
1404        reason = self.obj.callbacks.process.call_args_list[0][0][4]
1405        self.assertIsInstance(reason, exceptions.ChannelClosedByClient)
1406        self.assertEqual((reason.reply_code, reason.reply_text),
1407                         (200, 'All is well'))
1408
1409        self.assertEqual(self.obj._cleanup.call_count, 1)
1410
1411    def test_on_closeok_following_close_from_broker(self):
1412        self.obj._set_state(self.obj.OPEN)
1413        self.obj._cleanup = mock.Mock(wraps=self.obj._cleanup)
1414
1415        # Close from user
1416        self.obj.close(0, 'All is well')
1417        self.assertEqual(self.obj._closing_reason.reply_code, 0)
1418        self.assertEqual(self.obj._closing_reason.reply_text, 'All is well')
1419        self.assertEqual(self.obj._state, self.obj.CLOSING)
1420
1421        # Close from broker before Channel.CloseOk
1422        self.obj._on_close_from_broker(
1423            frame.Method(self.obj.channel_number,
1424                         spec.Channel.Close(400,
1425                                            'broker is having a bad day')))
1426
1427        self.assertIsInstance(self.obj._closing_reason,
1428                              exceptions.ChannelClosedByBroker)
1429        self.assertEqual(
1430            (self.obj._closing_reason.reply_code,
1431             self.obj._closing_reason.reply_text),
1432            (400, 'broker is having a bad day'))
1433        self.assertEqual(self.obj._state, self.obj.CLOSING)
1434
1435        self.obj._on_closeok(
1436            frame.Method(self.obj.channel_number, spec.Channel.CloseOk()))
1437
1438        # Verify this completes closing of the channel
1439        self.assertTrue(self.obj.is_closed,
1440                        'Channel was not closed; state=%s' %
1441                        (self.obj._state, ))
1442
1443        self.assertEqual(self.obj.callbacks.process.call_count, 2)
1444
1445        self.obj.callbacks.process.assert_any_call(
1446            self.obj.channel_number, '_on_channel_close', self.obj, self.obj,
1447            mock.ANY)
1448
1449        self.assertEqual(self.obj._cleanup.call_count, 1)
1450
1451    @mock.patch('logging.Logger.debug')
1452    def test_on_getempty(self, debug):
1453        method_frame = frame.Method(self.obj.channel_number,
1454                                    spec.Basic.GetEmpty)
1455        self.obj._on_getempty(method_frame)
1456        debug.assert_called_with('Received Basic.GetEmpty: %r', method_frame)
1457
1458    @mock.patch('logging.Logger.error')
1459    def test_on_getok_no_callback(self, error):
1460        method_value = frame.Method(1, spec.Basic.GetOk('ctag0', 1))
1461        header_value = frame.Header(1, 10, spec.BasicProperties())
1462        body_value = b'0123456789'
1463        self.obj._on_getok(method_value, header_value, body_value)
1464        error.assert_called_with(
1465            'Basic.GetOk received with no active callback')
1466
1467    def test_on_getok_callback_called(self):
1468        mock_callback = mock.Mock()
1469        self.obj._on_getok_callback = mock_callback
1470        method_value = frame.Method(1, spec.Basic.GetOk('ctag0', 1))
1471        header_value = frame.Header(1, 10, spec.BasicProperties())
1472        body_value = b'0123456789'
1473        self.obj._on_getok(method_value, header_value, body_value)
1474        mock_callback.assert_called_once_with(
1475            self.obj, method_value.method, header_value.properties, body_value)
1476
1477    def test_on_getok_callback_reset(self):
1478        mock_callback = mock.Mock()
1479        self.obj._on_getok_callback = mock_callback
1480        method_value = frame.Method(1, spec.Basic.GetOk('ctag0', 1))
1481        header_value = frame.Header(1, 10, spec.BasicProperties())
1482        body_value = b'0123456789'
1483        self.obj._on_getok(method_value, header_value, body_value)
1484        self.assertIsNone(self.obj._on_getok_callback)
1485
1486    @mock.patch('logging.Logger.debug')
1487    def test_on_confirm_selectok(self, debug):
1488        method_frame = frame.Method(self.obj.channel_number,
1489                                    spec.Confirm.SelectOk())
1490        self.obj._on_selectok(method_frame)
1491        debug.assert_called_with('Confirm.SelectOk Received: %r', method_frame)
1492
1493    @mock.patch('logging.Logger.debug')
1494    def test_on_eventok(self, debug):
1495        method_frame = frame.Method(self.obj.channel_number,
1496                                    spec.Basic.GetEmpty())
1497        self.obj._on_eventok(method_frame)
1498        debug.assert_called_with('Discarding frame %r', method_frame)
1499
1500    @mock.patch('logging.Logger.warning')
1501    def test_on_flow(self, warning):
1502        self.obj._has_on_flow_callback = False
1503        method_frame = frame.Method(self.obj.channel_number,
1504                                    spec.Channel.Flow())
1505        self.obj._on_flow(method_frame)
1506        warning.assert_called_with('Channel.Flow received from server')
1507
1508    @mock.patch('logging.Logger.warning')
1509    def test_on_flow_with_callback(self, warning):
1510        method_frame = frame.Method(self.obj.channel_number,
1511                                    spec.Channel.Flow())
1512        self.obj._on_flowok_callback = logging.debug
1513        self.obj._on_flow(method_frame)
1514        self.assertEqual(len(warning.call_args_list), 1)
1515
1516    @mock.patch('logging.Logger.warning')
1517    def test_on_flowok(self, warning):
1518        method_frame = frame.Method(self.obj.channel_number,
1519                                    spec.Channel.FlowOk())
1520        self.obj._on_flowok(method_frame)
1521        warning.assert_called_with('Channel.FlowOk received with no active '
1522                                   'callbacks')
1523
1524    def test_on_flowok_calls_callback(self):
1525        method_frame = frame.Method(self.obj.channel_number,
1526                                    spec.Channel.FlowOk())
1527        mock_callback = mock.Mock()
1528        self.obj._on_flowok_callback = mock_callback
1529        self.obj._on_flowok(method_frame)
1530        mock_callback.assert_called_once_with(method_frame.method.active)
1531
1532    def test_on_flowok_callback_reset(self):
1533        method_frame = frame.Method(self.obj.channel_number,
1534                                    spec.Channel.FlowOk())
1535        mock_callback = mock.Mock()
1536        self.obj._on_flowok_callback = mock_callback
1537        self.obj._on_flowok(method_frame)
1538        self.assertIsNone(self.obj._on_flowok_callback)
1539
1540    def test_on_openok_no_callback(self):
1541        self.obj._on_openok_callback = None
1542        method_value = frame.Method(1, spec.Channel.OpenOk())
1543        self.obj._on_openok(method_value)
1544        self.assertEqual(self.obj._state, self.obj.OPEN)
1545
1546    def test_on_openok_callback_called(self):
1547        mock_callback = mock.Mock()
1548        self.obj._on_openok_callback = mock_callback
1549        method_value = frame.Method(1, spec.Channel.OpenOk())
1550        self.obj._on_openok(method_value)
1551        mock_callback.assert_called_once_with(self.obj)
1552
1553    def test_onreturn(self):
1554        method_value = frame.Method(1,
1555                                    spec.Basic.Return(999, 'Reply Text',
1556                                                      'exchange_value',
1557                                                      'routing.key'))
1558        header_value = frame.Header(1, 10, spec.BasicProperties())
1559        body_value = frame.Body(1, b'0123456789')
1560        self.obj._on_return(method_value, header_value, body_value)
1561        self.obj.callbacks.process.assert_called_with(
1562            self.obj.channel_number, '_on_return', self.obj, self.obj,
1563            method_value.method, header_value.properties, body_value)
1564
1565    @mock.patch('logging.Logger.warning')
1566    def test_onreturn_warning(self, warning):
1567        method_value = frame.Method(1,
1568                                    spec.Basic.Return(999, 'Reply Text',
1569                                                      'exchange_value',
1570                                                      'routing.key'))
1571        header_value = frame.Header(1, 10, spec.BasicProperties())
1572        body_value = frame.Body(1, b'0123456789')
1573        self.obj.callbacks.process.return_value = False
1574        self.obj._on_return(method_value, header_value, body_value)
1575        warning.assert_called_with(
1576            'Basic.Return received from server (%r, %r)', method_value.method,
1577            header_value.properties)
1578
1579    @mock.patch('pika.channel.Channel._rpc')
1580    def test_on_synchronous_complete(self, rpc):
1581        mock_callback = mock.Mock()
1582        expectation = [
1583            spec.Queue.Unbind(0, 'foo', 'bar', 'baz'), mock_callback,
1584            [spec.Queue.UnbindOk]
1585        ]
1586        self.obj._blocked = collections.deque([expectation])
1587        self.obj._on_synchronous_complete(
1588            frame.Method(self.obj.channel_number, spec.Basic.Ack(1)))
1589        rpc.assert_called_once_with(*expectation)
1590
1591    def test_repr(self):
1592        text = repr(self.obj)
1593        self.assertTrue(text.startswith('<Channel'), text)
1594
1595    def test_rpc_raises_channel_wrong_state(self):
1596        self.assertRaises(exceptions.ChannelWrongStateError, self.obj._rpc,
1597                          spec.Basic.Cancel('tag_abc'))
1598
1599    def test_rpc_while_blocking_appends_blocked_collection(self):
1600        self.obj._set_state(self.obj.OPEN)
1601        self.obj._blocking = spec.Confirm.Select()
1602        acceptable_replies = [(spec.Basic.CancelOk, {
1603            'consumer_tag': 'tag_abc'
1604        })]
1605        expectation = [
1606            spec.Basic.Cancel('tag_abc'), lambda *args: None,
1607            acceptable_replies
1608        ]
1609        self.obj._rpc(*expectation)
1610        self.assertIn(expectation, self.obj._blocked)
1611
1612    def test_rpc_throws_value_error_with_unacceptable_replies(self):
1613        self.obj._set_state(self.obj.OPEN)
1614        self.assertRaises(TypeError, self.obj._rpc,
1615                          spec.Basic.Cancel('tag_abc'), logging.debug, 'Foo')
1616
1617    def test_rpc_throws_type_error_with_invalid_callback(self):
1618        self.obj._set_state(self.obj.OPEN)
1619        self.assertRaises(TypeError, self.obj._rpc, spec.Channel.Open(1),
1620                          ['foo'], [spec.Channel.OpenOk])
1621
1622    def test_rpc_enters_blocking_and_adds_on_synchronous_complete(self):
1623        self.obj._set_state(self.obj.OPEN)
1624        method_frame = spec.Channel.Open()
1625        self.obj._rpc(method_frame, None, [spec.Channel.OpenOk])
1626        self.assertEqual(self.obj._blocking, method_frame.NAME)
1627        self.obj.callbacks.add.assert_called_with(
1628            self.obj.channel_number,
1629            spec.Channel.OpenOk,
1630            self.obj._on_synchronous_complete,
1631            arguments=None)
1632
1633    def test_rpc_not_blocking_and_no_on_synchronous_complete_when_no_replies(
1634            self):
1635        self.obj._set_state(self.obj.OPEN)
1636        method_frame = spec.Channel.Open()
1637        self.obj._rpc(method_frame, None, acceptable_replies=[])
1638        self.assertIsNone(self.obj._blocking)
1639        with self.assertRaises(AssertionError):
1640            self.obj.callbacks.add.assert_called_with(
1641                mock.ANY,
1642                mock.ANY,
1643                self.obj._on_synchronous_complete,
1644                arguments=mock.ANY)
1645
1646    def test_rpc_adds_callback(self):
1647        self.obj._set_state(self.obj.OPEN)
1648        method_frame = spec.Channel.Open()
1649        mock_callback = mock.Mock()
1650        self.obj._rpc(method_frame, mock_callback, [spec.Channel.OpenOk])
1651        self.obj.callbacks.add.assert_called_with(
1652            self.obj.channel_number,
1653            spec.Channel.OpenOk,
1654            mock_callback,
1655            arguments=None)
1656
1657    def test_send_method(self):
1658        expectation = [2, 3]
1659        self.obj._send_method(*expectation)
1660        self.obj.connection._send_method.assert_called_once_with(
1661                *[self.obj.channel_number] + expectation)
1662
1663    def test_set_state(self):
1664        self.obj._state = channel.Channel.CLOSED
1665        self.obj._set_state(channel.Channel.OPENING)
1666        self.assertEqual(self.obj._state, channel.Channel.OPENING)
1667
1668    def test_raise_if_not_open_raises_channel_wrong_state(self):
1669        self.assertRaises(exceptions.ChannelWrongStateError,
1670                          self.obj._raise_if_not_open)
1671
1672    def test_no_side_effects_from_send_method_error(self):
1673        self.obj._set_state(self.obj.OPEN)
1674
1675        self.assertIsNone(self.obj._blocking)
1676
1677        with mock.patch.object(self.obj.callbacks, 'add') as cb_add_mock:
1678            with mock.patch.object(self.obj, '_send_method',
1679                                   side_effect=TypeError) as send_method_mock:
1680                with self.assertRaises(TypeError):
1681                    self.obj.queue_delete('', callback=lambda _frame: None)
1682
1683        self.assertEqual(send_method_mock.call_count, 1)
1684        self.assertIsNone(self.obj._blocking)
1685        self.assertEqual(cb_add_mock.call_count, 0)
1686