1"""blocking adapter test"""
2from datetime import datetime
3import functools
4import logging
5import socket
6import threading
7import unittest
8import uuid
9
10import pika
11from pika.adapters import blocking_connection
12from pika.compat import as_bytes, time_now
13import pika.connection
14import pika.exceptions
15
16from ..forward_server import ForwardServer
17from .test_utils import retry_assertion
18
19# too-many-lines
20# pylint: disable=C0302
21
22# Disable warning about access to protected member
23# pylint: disable=W0212
24
25# Disable warning Attribute defined outside __init__
26# pylint: disable=W0201
27
28# Disable warning Missing docstring
29# pylint: disable=C0111
30
31# Disable warning Too many public methods
32# pylint: disable=R0904
33
34# Disable warning Invalid variable name
35# pylint: disable=C0103
36
37
38LOGGER = logging.getLogger(__name__)
39
40PARAMS_URL_TEMPLATE = (
41    'amqp://guest:guest@127.0.0.1:%(port)s/%%2f?socket_timeout=1')
42DEFAULT_URL = PARAMS_URL_TEMPLATE % {'port': 5672}
43DEFAULT_PARAMS = pika.URLParameters(DEFAULT_URL)
44DEFAULT_TIMEOUT = 15
45
46
47
48def setUpModule():
49    logging.basicConfig(level=logging.DEBUG)
50
51
52class BlockingTestCaseBase(unittest.TestCase):
53
54    TIMEOUT = DEFAULT_TIMEOUT
55
56    def _connect(self,
57                 url=DEFAULT_URL,
58                 connection_class=pika.BlockingConnection,
59                 impl_class=None):
60        parameters = pika.URLParameters(url)
61
62        return self._connect_params(parameters,
63                                    connection_class,
64                                    impl_class)
65
66    def _connect_params(self,
67                        parameters,
68                        connection_class=pika.BlockingConnection,
69                        impl_class=None):
70
71        connection = connection_class(parameters, _impl_class=impl_class)
72        self.addCleanup(lambda: connection.close()
73                        if connection.is_open else None)
74
75        # We use impl's timer directly in order to get a callback regardless
76        # of BlockingConnection's event dispatch modality
77        connection._impl._adapter_call_later(self.TIMEOUT, # pylint: disable=E1101
78                                             self._on_test_timeout)
79
80        # Patch calls into I/O loop to fail test if exceptions are
81        # leaked back through SelectConnection or the I/O loop.
82        self._instrument_io_loop_exception_leak_detection(connection)
83
84        return connection
85
86    def _instrument_io_loop_exception_leak_detection(self, connection):
87        """Instrument the given connection to detect and fail test when
88        an exception is leaked through the I/O loop
89
90        NOTE: BlockingConnection's underlying asynchronous connection adapter
91        (SelectConnection) uses callbacks to communicate with its user (
92        BlockingConnection in this case). If BlockingConnection leaks
93        exceptions back into the I/O loop or the asynchronous connection
94        adapter, we interrupt their normal workflow and introduce a high
95        likelihood of state inconsistency.
96        """
97        # Patch calls into I/O loop to fail test if exceptions are
98        # leaked back through SelectConnection or the I/O loop.
99        real_poll = connection._impl.ioloop.poll
100        def my_poll(*args, **kwargs):
101            try:
102                return real_poll(*args, **kwargs)
103            except BaseException as exc:
104                self.fail('Unwanted exception leaked into asynchronous layer '
105                          'via ioloop.poll(): {!r}'.format(exc))
106
107        connection._impl.ioloop.poll = my_poll
108        self.addCleanup(setattr, connection._impl.ioloop, 'poll', real_poll)
109
110        real_process_timeouts = connection._impl.ioloop.process_timeouts
111        def my_process_timeouts(*args, **kwargs):
112            try:
113                return real_process_timeouts(*args, **kwargs)
114            except AssertionError:
115                # Our test timeout logic and unit test assert* routines rely
116                # on being able to pass AssertionError
117                raise
118            except BaseException as exc:
119                self.fail('Unwanted exception leaked into asynchronous layer '
120                          'via ioloop.process_timeouts(): {!r}'.format(exc))
121
122        connection._impl.ioloop.process_timeouts = my_process_timeouts
123        self.addCleanup(setattr, connection._impl.ioloop, 'process_timeouts',
124                        real_process_timeouts)
125
126    def _on_test_timeout(self):
127        """Called when test times out"""
128        LOGGER.info('%s TIMED OUT (%s)', datetime.utcnow(), self)
129        self.fail('Test timed out')
130
131    @retry_assertion(TIMEOUT/2)
132    def _assert_exact_message_count_with_retries(self,
133                                                 channel,
134                                                 queue,
135                                                 expected_count):
136        frame = channel.queue_declare(queue, passive=True)
137        self.assertEqual(frame.method.message_count, expected_count)
138
139
140class TestCreateAndCloseConnection(BlockingTestCaseBase):
141
142    def test(self):
143        """BlockingConnection: Create and close connection"""
144        connection = self._connect()
145        self.assertIsInstance(connection, pika.BlockingConnection)
146        self.assertTrue(connection.is_open)
147        self.assertFalse(connection.is_closed)
148        self.assertFalse(connection._impl.is_closing)
149
150        connection.close()
151        self.assertTrue(connection.is_closed)
152        self.assertFalse(connection.is_open)
153        self.assertFalse(connection._impl.is_closing)
154
155
156class TestCreateConnectionWithNoneSocketAndStackTimeouts(BlockingTestCaseBase):
157
158    def test(self):
159        """ BlockingConnection: create a connection with socket and stack timeouts both None
160
161        """
162        params = pika.URLParameters(DEFAULT_URL)
163        params.socket_timeout = None
164        params.stack_timeout = None
165
166        with self._connect_params(params) as connection:
167            self.assertTrue(connection.is_open)
168
169
170class TestCreateConnectionFromTwoConfigsFirstUnreachable(BlockingTestCaseBase):
171
172    def test(self):
173        """ BlockingConnection: create a connection from two configs, first unreachable
174
175        """
176        # Reserve a port for use in connect
177        sock = socket.socket()
178        self.addCleanup(sock.close)
179
180        sock.bind(('127.0.0.1', 0))
181
182        port = sock.getsockname()[1]
183
184        sock.close()
185
186        bad_params = pika.URLParameters(PARAMS_URL_TEMPLATE % {"port": port})
187        good_params = pika.URLParameters(DEFAULT_URL)
188
189        with self._connect_params([bad_params, good_params]) as connection:
190            self.assertNotEqual(connection._impl.params.port, bad_params.port)
191            self.assertEqual(connection._impl.params.port, good_params.port)
192
193
194class TestCreateConnectionFromTwoUnreachableConfigs(BlockingTestCaseBase):
195
196    def test(self):
197        """ BlockingConnection: creating a connection from two unreachable \
198        configs raises AMQPConnectionError
199
200        """
201        # Reserve a port for use in connect
202        sock = socket.socket()
203        self.addCleanup(sock.close)
204
205        sock.bind(('127.0.0.1', 0))
206
207        port = sock.getsockname()[1]
208
209        sock.close()
210
211        bad_params = pika.URLParameters(PARAMS_URL_TEMPLATE % {"port": port})
212
213        with self.assertRaises(pika.exceptions.AMQPConnectionError):
214            self._connect_params([bad_params, bad_params])
215
216
217class TestMultiCloseConnectionRaisesWrongState(BlockingTestCaseBase):
218
219    def test(self):
220        """BlockingConnection: Close connection twice raises ConnectionWrongStateError"""
221        connection = self._connect()
222        self.assertIsInstance(connection, pika.BlockingConnection)
223        self.assertTrue(connection.is_open)
224        self.assertFalse(connection.is_closed)
225        self.assertFalse(connection._impl.is_closing)
226
227        connection.close()
228        self.assertTrue(connection.is_closed)
229        self.assertFalse(connection.is_open)
230        self.assertFalse(connection._impl.is_closing)
231
232        with self.assertRaises(pika.exceptions.ConnectionWrongStateError):
233            connection.close()
234
235
236class TestConnectionContextManagerClosesConnection(BlockingTestCaseBase):
237    def test(self):
238        """BlockingConnection: connection context manager closes connection"""
239        with self._connect() as connection:
240            self.assertIsInstance(connection, pika.BlockingConnection)
241            self.assertTrue(connection.is_open)
242
243        self.assertTrue(connection.is_closed)
244
245
246class TestConnectionContextManagerExitSurvivesClosedConnection(BlockingTestCaseBase):
247    def test(self):
248        """BlockingConnection: connection context manager exit survives closed connection"""
249        with self._connect() as connection:
250            self.assertTrue(connection.is_open)
251            connection.close()
252            self.assertTrue(connection.is_closed)
253
254        self.assertTrue(connection.is_closed)
255
256
257class TestConnectionContextManagerClosesConnectionAndPassesOriginalException(BlockingTestCaseBase):
258    def test(self):
259        """BlockingConnection: connection context manager closes connection and passes original exception"""  # pylint: disable=C0301
260        class MyException(Exception):
261            pass
262
263        with self.assertRaises(MyException):
264            with self._connect() as connection:
265                self.assertTrue(connection.is_open)
266
267                raise MyException()
268
269        self.assertTrue(connection.is_closed)
270
271
272class TestConnectionContextManagerClosesConnectionAndPassesSystemException(BlockingTestCaseBase):
273    def test(self):
274        """BlockingConnection: connection context manager closes connection and passes system exception"""  # pylint: disable=C0301
275        with self.assertRaises(SystemExit):
276            with self._connect() as connection:
277                self.assertTrue(connection.is_open)
278                raise SystemExit()
279
280        self.assertTrue(connection.is_closed)
281
282
283class TestLostConnectionResultsInIsClosedConnectionAndChannel(BlockingTestCaseBase):
284    def test(self):
285        connection = self._connect()
286        channel = connection.channel()
287
288        # Simulate the server dropping the socket connection
289        connection._impl._transport._sock.shutdown(socket.SHUT_RDWR)
290
291        with self.assertRaises(pika.exceptions.StreamLostError):
292            # Changing QoS should result in ConnectionClosed
293            channel.basic_qos()
294
295        # Now check is_open/is_closed on channel and connection
296        self.assertFalse(channel.is_open)
297        self.assertTrue(channel.is_closed)
298        self.assertFalse(connection.is_open)
299        self.assertTrue(connection.is_closed)
300
301
302class TestInvalidExchangeTypeRaisesConnectionClosed(BlockingTestCaseBase):
303    def test(self):
304        """BlockingConnection: ConnectionClosed raised when creating exchange with invalid type"""  # pylint: disable=C0301
305        # This test exploits behavior specific to RabbitMQ whereby the broker
306        # closes the connection if an attempt is made to declare an exchange
307        # with an invalid exchange type
308        connection = self._connect()
309        ch = connection.channel()
310
311        exg_name = ("TestInvalidExchangeTypeRaisesConnectionClosed_" +
312                    uuid.uuid1().hex)
313
314        with self.assertRaises(pika.exceptions.ConnectionClosed) as ex_cm:
315            # Attempt to create an exchange with invalid exchange type
316            ch.exchange_declare(exg_name, exchange_type='ZZwwInvalid')
317
318        self.assertEqual(ex_cm.exception.args[0], 503)
319
320
321class TestCreateAndCloseConnectionWithChannelAndConsumer(BlockingTestCaseBase):
322
323    def test(self):
324        """BlockingConnection: Create and close connection with channel and consumer"""  # pylint: disable=C0301
325        connection = self._connect()
326
327        ch = connection.channel()
328
329        q_name = (
330            'TestCreateAndCloseConnectionWithChannelAndConsumer_q' +
331            uuid.uuid1().hex)
332
333        body1 = 'a' * 1024
334
335        # Declare a new queue
336        ch.queue_declare(q_name, auto_delete=True)
337        self.addCleanup(lambda: self._connect().channel().queue_delete(q_name))
338
339        # Publish the message to the queue by way of default exchange
340        ch.basic_publish(exchange='', routing_key=q_name, body=body1)
341
342        # Create a consumer that uses automatic ack mode
343        ch.basic_consume(q_name, lambda *x: None, auto_ack=True,
344                         exclusive=False, arguments=None)
345
346        connection.close()
347        self.assertTrue(connection.is_closed)
348        self.assertFalse(connection.is_open)
349        self.assertFalse(connection._impl.is_closing)
350
351        self.assertFalse(connection._impl._channels)
352
353        self.assertFalse(ch._consumer_infos)
354        self.assertFalse(ch._impl._consumers)
355
356
357class TestUsingInvalidQueueArgument(BlockingTestCaseBase):
358    def test(self):
359        """BlockingConnection raises expected exception when invalid queue parameter is used
360        """
361        connection = self._connect()
362        ch = connection.channel()
363        with self.assertRaises(TypeError):
364            ch.queue_declare(queue=[1, 2, 3])
365
366
367class TestSuddenBrokerDisconnectBeforeChannel(BlockingTestCaseBase):
368
369    def test(self):
370        """BlockingConnection resets properly on TCP/IP drop during channel()
371        """
372        with ForwardServer(remote_addr=(DEFAULT_PARAMS.host, DEFAULT_PARAMS.port),
373                           local_linger_args=(1, 0)) as fwd:
374
375            self.connection = self._connect(
376                PARAMS_URL_TEMPLATE % {"port": fwd.server_address[1]})
377
378        # Once outside the context, the connection is broken
379
380        # BlockingConnection should raise ConnectionClosed
381        with self.assertRaises(pika.exceptions.StreamLostError):
382            self.connection.channel()
383
384        self.assertTrue(self.connection.is_closed)
385        self.assertFalse(self.connection.is_open)
386        self.assertIsNone(self.connection._impl._transport)
387
388
389class TestNoAccessToConnectionAfterConnectionLost(BlockingTestCaseBase):
390
391    def test(self):
392        """BlockingConnection no access file descriptor after StreamLostError
393        """
394        with ForwardServer(remote_addr=(DEFAULT_PARAMS.host, DEFAULT_PARAMS.port),
395                           local_linger_args=(1, 0)) as fwd:
396
397            self.connection = self._connect(
398                PARAMS_URL_TEMPLATE % {"port": fwd.server_address[1]})
399
400        # Once outside the context, the connection is broken
401
402        # BlockingConnection should raise ConnectionClosed
403        with self.assertRaises(pika.exceptions.StreamLostError):
404            self.connection.channel()
405
406        self.assertTrue(self.connection.is_closed)
407        self.assertFalse(self.connection.is_open)
408        self.assertIsNone(self.connection._impl._transport)
409
410        # Attempt to operate on the connection once again after ConnectionClosed
411        with self.assertRaises(pika.exceptions.ConnectionWrongStateError):
412            self.connection.channel()
413
414
415class TestConnectWithDownedBroker(BlockingTestCaseBase):
416
417    def test(self):
418        """ BlockingConnection to downed broker results in AMQPConnectionError
419
420        """
421        # Reserve a port for use in connect
422        sock = socket.socket()
423        self.addCleanup(sock.close)
424
425        sock.bind(('127.0.0.1', 0))
426
427        port = sock.getsockname()[1]
428
429        sock.close()
430
431        with self.assertRaises(pika.exceptions.AMQPConnectionError):
432            self.connection = self._connect(
433                PARAMS_URL_TEMPLATE % {"port": port})
434
435
436class TestDisconnectDuringConnectionStart(BlockingTestCaseBase):
437
438    def test(self):
439        """ BlockingConnection TCP/IP connection loss in CONNECTION_START
440        """
441        fwd = ForwardServer(
442            remote_addr=(DEFAULT_PARAMS.host, DEFAULT_PARAMS.port),
443            local_linger_args=(1, 0))
444
445        fwd.start()
446        self.addCleanup(lambda: fwd.stop() if fwd.running else None)
447
448        class MySelectConnection(pika.SelectConnection):
449            assert hasattr(pika.SelectConnection, '_on_connection_start')
450
451            def _on_connection_start(self, *args, **kwargs):  # pylint: disable=W0221
452                fwd.stop()
453                return super(MySelectConnection, self)._on_connection_start(
454                    *args, **kwargs)
455
456        with self.assertRaises(pika.exceptions.ProbableAuthenticationError):
457            self._connect(
458                PARAMS_URL_TEMPLATE % {"port": fwd.server_address[1]},
459                impl_class=MySelectConnection)
460
461
462class TestDisconnectDuringConnectionTune(BlockingTestCaseBase):
463
464    def test(self):
465        """ BlockingConnection TCP/IP connection loss in CONNECTION_TUNE
466        """
467        fwd = ForwardServer(
468            remote_addr=(DEFAULT_PARAMS.host, DEFAULT_PARAMS.port),
469            local_linger_args=(1, 0))
470        fwd.start()
471        self.addCleanup(lambda: fwd.stop() if fwd.running else None)
472
473        class MySelectConnection(pika.SelectConnection):
474            assert hasattr(pika.SelectConnection, '_on_connection_tune')
475
476            def _on_connection_tune(self, *args, **kwargs):  # pylint: disable=W0221
477                fwd.stop()
478                return super(MySelectConnection, self)._on_connection_tune(
479                    *args, **kwargs)
480
481        with self.assertRaises(pika.exceptions.ProbableAccessDeniedError):
482            self._connect(
483                PARAMS_URL_TEMPLATE % {"port": fwd.server_address[1]},
484                impl_class=MySelectConnection)
485
486
487class TestDisconnectDuringConnectionProtocol(BlockingTestCaseBase):
488
489    def test(self):
490        """ BlockingConnection TCP/IP connection loss in CONNECTION_PROTOCOL
491        """
492        fwd = ForwardServer(
493            remote_addr=(DEFAULT_PARAMS.host, DEFAULT_PARAMS.port),
494            local_linger_args=(1, 0))
495
496        fwd.start()
497        self.addCleanup(lambda: fwd.stop() if fwd.running else None)
498
499        class MySelectConnection(pika.SelectConnection):
500            assert hasattr(pika.SelectConnection, '_on_stream_connected')
501
502            def _on_stream_connected(self, *args, **kwargs):  # pylint: disable=W0221
503                fwd.stop()
504                return super(MySelectConnection, self)._on_stream_connected(
505                    *args, **kwargs)
506
507        with self.assertRaises(pika.exceptions.IncompatibleProtocolError):
508            self._connect(PARAMS_URL_TEMPLATE % {"port": fwd.server_address[1]},
509                          impl_class=MySelectConnection)
510
511
512class TestProcessDataEvents(BlockingTestCaseBase):
513
514    def test(self):
515        """BlockingConnection.process_data_events"""
516        connection = self._connect()
517
518        # Try with time_limit=0
519        start_time = time_now()
520        connection.process_data_events(time_limit=0)
521        elapsed = time_now() - start_time
522        self.assertLess(elapsed, 0.25)
523
524        # Try with time_limit=0.005
525        start_time = time_now()
526        connection.process_data_events(time_limit=0.005)
527        elapsed = time_now() - start_time
528        self.assertGreaterEqual(elapsed, 0.005)
529        self.assertLess(elapsed, 0.25)
530
531
532class TestConnectionRegisterForBlockAndUnblock(BlockingTestCaseBase):
533
534    def test(self):
535        """BlockingConnection register for Connection.Blocked/Unblocked"""
536        connection = self._connect()
537
538        # NOTE: I haven't figured out yet how to coerce RabbitMQ to emit
539        # Connection.Block and Connection.Unblock from the test, so we'll
540        # just call the registration functions for now and simulate incoming
541        # blocked/unblocked frames
542
543        blocked_buffer = []
544        connection.add_on_connection_blocked_callback(
545            lambda conn, frame: blocked_buffer.append((conn, frame)))
546        # Simulate dispatch of blocked connection
547        blocked_frame = pika.frame.Method(
548            0,
549            pika.spec.Connection.Blocked('reason'))
550        connection._impl._process_frame(blocked_frame)
551        connection.sleep(0) # facilitate dispatch of pending events
552        self.assertEqual(len(blocked_buffer), 1)
553        conn, frame = blocked_buffer[0]
554        self.assertIs(conn, connection)
555        self.assertIs(frame, blocked_frame)
556
557        unblocked_buffer = []
558        connection.add_on_connection_unblocked_callback(
559            lambda conn, frame: unblocked_buffer.append((conn, frame)))
560        # Simulate dispatch of unblocked connection
561        unblocked_frame = pika.frame.Method(0, pika.spec.Connection.Unblocked())
562        connection._impl._process_frame(unblocked_frame)
563        connection.sleep(0) # facilitate dispatch of pending events
564        self.assertEqual(len(unblocked_buffer), 1)
565        conn, frame = unblocked_buffer[0]
566        self.assertIs(conn, connection)
567        self.assertIs(frame, unblocked_frame)
568
569
570class TestBlockedConnectionTimeout(BlockingTestCaseBase):
571
572    def test(self):
573        """BlockingConnection Connection.Blocked timeout """
574        url = DEFAULT_URL + '&blocked_connection_timeout=0.001'
575        conn = self._connect(url=url)
576
577        # NOTE: I haven't figured out yet how to coerce RabbitMQ to emit
578        # Connection.Block and Connection.Unblock from the test, so we'll
579        # simulate it for now
580
581        # Simulate Connection.Blocked
582        conn._impl._on_connection_blocked(
583            conn._impl,
584            pika.frame.Method(
585                0,
586                pika.spec.Connection.Blocked('TestBlockedConnectionTimeout')))
587
588        # Wait for connection teardown
589        with self.assertRaises(pika.exceptions.ConnectionBlockedTimeout):
590            while True:
591                conn.process_data_events(time_limit=1)
592
593
594class TestAddCallbackThreadsafeFromSameThread(BlockingTestCaseBase):
595
596    def test(self):
597        """BlockingConnection.add_callback_threadsafe from same thread"""
598        connection = self._connect()
599
600        # Test timer completion
601        start_time = time_now()
602        rx_callback = []
603        connection.add_callback_threadsafe(
604            lambda: rx_callback.append(time_now()))
605        while not rx_callback:
606            connection.process_data_events(time_limit=None)
607
608        self.assertEqual(len(rx_callback), 1)
609        elapsed = time_now() - start_time
610        self.assertLess(elapsed, 0.25)
611
612
613class TestAddCallbackThreadsafeFromAnotherThread(BlockingTestCaseBase):
614
615    def test(self):
616        """BlockingConnection.add_callback_threadsafe from another thread"""
617        connection = self._connect()
618
619        # Test timer completion
620        start_time = time_now()
621        rx_callback = []
622        timer = threading.Timer(
623            0,
624            functools.partial(connection.add_callback_threadsafe,
625                              lambda: rx_callback.append(time_now())))
626        self.addCleanup(timer.cancel)
627        timer.start()
628        while not rx_callback:
629            connection.process_data_events(time_limit=None)
630
631        self.assertEqual(len(rx_callback), 1)
632        elapsed = time_now() - start_time
633        self.assertLess(elapsed, 0.25)
634
635
636class TestAddCallbackThreadsafeOnClosedConnectionRaisesWrongState(
637        BlockingTestCaseBase):
638
639    def test(self):
640        """BlockingConnection.add_callback_threadsafe on closed connection raises ConnectionWrongStateError"""
641        connection = self._connect()
642        connection.close()
643
644        with self.assertRaises(pika.exceptions.ConnectionWrongStateError):
645            connection.add_callback_threadsafe(lambda: None)
646
647
648class TestAddTimeoutRemoveTimeout(BlockingTestCaseBase):
649
650    def test(self):
651        """BlockingConnection.call_later and remove_timeout"""
652        connection = self._connect()
653
654        # Test timer completion
655        start_time = time_now()
656        rx_callback = []
657        timer_id = connection.call_later(
658            0.005,
659            lambda: rx_callback.append(time_now()))
660        while not rx_callback:
661            connection.process_data_events(time_limit=None)
662
663        self.assertEqual(len(rx_callback), 1)
664        elapsed = time_now() - start_time
665        self.assertLess(elapsed, 0.25)
666
667        # Test removing triggered timeout
668        connection.remove_timeout(timer_id)
669
670
671        # Test aborted timer
672        rx_callback = []
673        timer_id = connection.call_later(
674            0.001,
675            lambda: rx_callback.append(time_now()))
676        connection.remove_timeout(timer_id)
677        connection.process_data_events(time_limit=0.1)
678        self.assertFalse(rx_callback)
679
680        # Make sure _TimerEvt repr doesn't crash
681        evt = blocking_connection._TimerEvt(lambda: None)
682        repr(evt)
683
684
685class TestViabilityOfMultipleTimeoutsWithSameDeadlineAndCallback(BlockingTestCaseBase):
686
687    def test(self):
688        """BlockingConnection viability of multiple timeouts with same deadline and callback"""
689        connection = self._connect()
690
691        rx_callback = []
692
693        def callback():
694            rx_callback.append(1)
695
696        timer1 = connection.call_later(0, callback)
697        timer2 = connection.call_later(0, callback)
698
699        self.assertIsNot(timer1, timer2)
700
701        connection.remove_timeout(timer1)
702
703        # Wait for second timer to fire
704        start_wait_time = time_now()
705        while not rx_callback and time_now() - start_wait_time < 0.25:
706            connection.process_data_events(time_limit=0.001)
707
708        self.assertListEqual(rx_callback, [1])
709
710
711class TestRemoveTimeoutFromTimeoutCallback(BlockingTestCaseBase):
712
713    def test(self):
714        """BlockingConnection.remove_timeout from timeout callback"""
715        connection = self._connect()
716
717        # Test timer completion
718        timer_id1 = connection.call_later(5, lambda: 0/0)
719
720        rx_timer2 = []
721        def on_timer2():
722            connection.remove_timeout(timer_id1)
723            connection.remove_timeout(timer_id2)
724            rx_timer2.append(1)
725
726        timer_id2 = connection.call_later(0, on_timer2)
727
728        while not rx_timer2:
729            connection.process_data_events(time_limit=None)
730
731        self.assertFalse(connection._ready_events)
732
733
734class TestSleep(BlockingTestCaseBase):
735
736    def test(self):
737        """BlockingConnection.sleep"""
738        connection = self._connect()
739
740        # Try with duration=0
741        start_time = time_now()
742        connection.sleep(duration=0)
743        elapsed = time_now() - start_time
744        self.assertLess(elapsed, 0.25)
745
746        # Try with duration=0.005
747        start_time = time_now()
748        connection.sleep(duration=0.005)
749        elapsed = time_now() - start_time
750        self.assertGreaterEqual(elapsed, 0.005)
751        self.assertLess(elapsed, 0.25)
752
753
754class TestConnectionProperties(BlockingTestCaseBase):
755
756    def test(self):
757        """Test BlockingConnection properties"""
758        connection = self._connect()
759
760        self.assertTrue(connection.is_open)
761        self.assertFalse(connection._impl.is_closing)
762        self.assertFalse(connection.is_closed)
763
764        self.assertTrue(connection.basic_nack_supported)
765        self.assertTrue(connection.consumer_cancel_notify_supported)
766        self.assertTrue(connection.exchange_exchange_bindings_supported)
767        self.assertTrue(connection.publisher_confirms_supported)
768
769        connection.close()
770        self.assertFalse(connection.is_open)
771        self.assertFalse(connection._impl.is_closing)
772        self.assertTrue(connection.is_closed)
773
774
775
776class TestCreateAndCloseChannel(BlockingTestCaseBase):
777
778    def test(self):
779        """BlockingChannel: Create and close channel"""
780        connection = self._connect()
781
782        ch = connection.channel()
783        self.assertIsInstance(ch, blocking_connection.BlockingChannel)
784        self.assertTrue(ch.is_open)
785        self.assertFalse(ch.is_closed)
786        self.assertFalse(ch._impl.is_closing)
787        self.assertIs(ch.connection, connection)
788
789        ch.close()
790        self.assertTrue(ch.is_closed)
791        self.assertFalse(ch.is_open)
792        self.assertFalse(ch._impl.is_closing)
793
794
795class TestExchangeDeclareAndDelete(BlockingTestCaseBase):
796
797    def test(self):
798        """BlockingChannel: Test exchange_declare and exchange_delete"""
799        connection = self._connect()
800
801        ch = connection.channel()
802
803        name = "TestExchangeDeclareAndDelete_" + uuid.uuid1().hex
804
805        # Declare a new exchange
806        frame = ch.exchange_declare(name, exchange_type='direct')
807        self.addCleanup(connection.channel().exchange_delete, name)
808
809        self.assertIsInstance(frame.method, pika.spec.Exchange.DeclareOk)
810
811        # Check if it exists by declaring it passively
812        frame = ch.exchange_declare(name, passive=True)
813        self.assertIsInstance(frame.method, pika.spec.Exchange.DeclareOk)
814
815        # Delete the exchange
816        frame = ch.exchange_delete(name)
817        self.assertIsInstance(frame.method, pika.spec.Exchange.DeleteOk)
818
819        # Verify that it's been deleted
820        with self.assertRaises(pika.exceptions.ChannelClosedByBroker) as cm:
821            ch.exchange_declare(name, passive=True)
822
823        self.assertEqual(cm.exception.args[0], 404)
824
825
826class TestExchangeBindAndUnbind(BlockingTestCaseBase):
827
828    def test(self):
829        """BlockingChannel: Test exchange_bind and exchange_unbind"""
830        connection = self._connect()
831
832        ch = connection.channel()
833
834        q_name = 'TestExchangeBindAndUnbind_q' + uuid.uuid1().hex
835        src_exg_name = 'TestExchangeBindAndUnbind_src_exg_' + uuid.uuid1().hex
836        dest_exg_name = 'TestExchangeBindAndUnbind_dest_exg_' + uuid.uuid1().hex
837        routing_key = 'TestExchangeBindAndUnbind'
838
839        # Place channel in publisher-acknowledgments mode so that we may test
840        # whether the queue is reachable by publishing with mandatory=True
841        res = ch.confirm_delivery()
842        self.assertIsNone(res)
843
844        # Declare both exchanges
845        ch.exchange_declare(src_exg_name, exchange_type='direct')
846        self.addCleanup(connection.channel().exchange_delete, src_exg_name)
847        ch.exchange_declare(dest_exg_name, exchange_type='direct')
848        self.addCleanup(connection.channel().exchange_delete, dest_exg_name)
849
850        # Declare a new queue
851        ch.queue_declare(q_name, auto_delete=True)
852        self.addCleanup(lambda: self._connect().channel().queue_delete(q_name))
853
854        # Bind the queue to the destination exchange
855        ch.queue_bind(q_name, exchange=dest_exg_name, routing_key=routing_key)
856
857        # Verify that the queue is unreachable without exchange-exchange binding
858        with self.assertRaises(pika.exceptions.UnroutableError):
859            ch.basic_publish(src_exg_name, routing_key, body='', mandatory=True)
860
861        # Bind the exchanges
862        frame = ch.exchange_bind(destination=dest_exg_name, source=src_exg_name,
863                                 routing_key=routing_key)
864        self.assertIsInstance(frame.method, pika.spec.Exchange.BindOk)
865
866        # Publish a message via the source exchange
867        ch.basic_publish(src_exg_name, routing_key, body='TestExchangeBindAndUnbind',
868                         mandatory=True)
869
870        # Check that the queue now has one message
871        self._assert_exact_message_count_with_retries(channel=ch,
872                                                      queue=q_name,
873                                                      expected_count=1)
874
875        # Unbind the exchanges
876        frame = ch.exchange_unbind(destination=dest_exg_name,
877                                   source=src_exg_name,
878                                   routing_key=routing_key)
879        self.assertIsInstance(frame.method, pika.spec.Exchange.UnbindOk)
880
881        # Verify that the queue is now unreachable via the source exchange
882        with self.assertRaises(pika.exceptions.UnroutableError):
883            ch.basic_publish(src_exg_name, routing_key, body='', mandatory=True)
884
885
886class TestQueueDeclareAndDelete(BlockingTestCaseBase):
887
888    def test(self):
889        """BlockingChannel: Test queue_declare and queue_delete"""
890        connection = self._connect()
891
892        ch = connection.channel()
893
894        q_name = 'TestQueueDeclareAndDelete_' + uuid.uuid1().hex
895
896        # Declare a new queue
897        frame = ch.queue_declare(q_name, auto_delete=True)
898        self.addCleanup(lambda: self._connect().channel().queue_delete(q_name))
899
900        self.assertIsInstance(frame.method, pika.spec.Queue.DeclareOk)
901
902        # Check if it exists by declaring it passively
903        frame = ch.queue_declare(q_name, passive=True)
904        self.assertIsInstance(frame.method, pika.spec.Queue.DeclareOk)
905
906        # Delete the queue
907        frame = ch.queue_delete(q_name)
908        self.assertIsInstance(frame.method, pika.spec.Queue.DeleteOk)
909
910        # Verify that it's been deleted
911        with self.assertRaises(pika.exceptions.ChannelClosedByBroker) as cm:
912            ch.queue_declare(q_name, passive=True)
913
914        self.assertEqual(cm.exception.args[0], 404)
915
916
917class TestPassiveQueueDeclareOfUnknownQueueRaisesChannelClosed(
918        BlockingTestCaseBase):
919    def test(self):
920        """BlockingChannel: ChannelClosed raised when passive-declaring unknown queue"""  # pylint: disable=C0301
921        connection = self._connect()
922        ch = connection.channel()
923
924        q_name = ("TestPassiveQueueDeclareOfUnknownQueueRaisesChannelClosed_q_"
925                  + uuid.uuid1().hex)
926
927        with self.assertRaises(pika.exceptions.ChannelClosedByBroker) as ex_cm:
928            ch.queue_declare(q_name, passive=True)
929
930        self.assertEqual(ex_cm.exception.args[0], 404)
931
932
933class TestQueueBindAndUnbindAndPurge(BlockingTestCaseBase):
934
935    def test(self):
936        """BlockingChannel: Test queue_bind and queue_unbind"""
937        connection = self._connect()
938
939        ch = connection.channel()
940
941        q_name = 'TestQueueBindAndUnbindAndPurge_q' + uuid.uuid1().hex
942        exg_name = 'TestQueueBindAndUnbindAndPurge_exg_' + uuid.uuid1().hex
943        routing_key = 'TestQueueBindAndUnbindAndPurge'
944
945        # Place channel in publisher-acknowledgments mode so that we may test
946        # whether the queue is reachable by publishing with mandatory=True
947        res = ch.confirm_delivery()
948        self.assertIsNone(res)
949
950        # Declare a new exchange
951        ch.exchange_declare(exg_name, exchange_type='direct')
952        self.addCleanup(connection.channel().exchange_delete, exg_name)
953
954        # Declare a new queue
955        ch.queue_declare(q_name, auto_delete=True)
956        self.addCleanup(lambda: self._connect().channel().queue_delete(q_name))
957
958        # Bind the queue to the exchange using routing key
959        frame = ch.queue_bind(q_name, exchange=exg_name,
960                              routing_key=routing_key)
961        self.assertIsInstance(frame.method, pika.spec.Queue.BindOk)
962
963        # Check that the queue is empty
964        frame = ch.queue_declare(q_name, passive=True)
965        self.assertEqual(frame.method.message_count, 0)
966
967        # Deposit a message in the queue
968        ch.basic_publish(exg_name, routing_key, body='TestQueueBindAndUnbindAndPurge',
969                         mandatory=True)
970
971        # Check that the queue now has one message
972        frame = ch.queue_declare(q_name, passive=True)
973        self.assertEqual(frame.method.message_count, 1)
974
975        # Unbind the queue
976        frame = ch.queue_unbind(queue=q_name, exchange=exg_name,
977                                routing_key=routing_key)
978        self.assertIsInstance(frame.method, pika.spec.Queue.UnbindOk)
979
980        # Verify that the queue is now unreachable via that binding
981        with self.assertRaises(pika.exceptions.UnroutableError):
982            ch.basic_publish(exg_name, routing_key,
983                             body='TestQueueBindAndUnbindAndPurge-2',
984                             mandatory=True)
985
986        # Purge the queue and verify that 1 message was purged
987        frame = ch.queue_purge(q_name)
988        self.assertIsInstance(frame.method, pika.spec.Queue.PurgeOk)
989        self.assertEqual(frame.method.message_count, 1)
990
991        # Verify that the queue is now empty
992        frame = ch.queue_declare(q_name, passive=True)
993        self.assertEqual(frame.method.message_count, 0)
994
995
996class TestBasicGet(BlockingTestCaseBase):
997
998    def tearDown(self):
999        LOGGER.info('%s TEARING DOWN (%s)', datetime.utcnow(), self)
1000
1001    def test(self):
1002        """BlockingChannel.basic_get"""
1003        LOGGER.info('%s STARTED (%s)', datetime.utcnow(), self)
1004
1005        connection = self._connect()
1006        LOGGER.info('%s CONNECTED (%s)', datetime.utcnow(), self)
1007
1008        ch = connection.channel()
1009        LOGGER.info('%s CREATED CHANNEL (%s)', datetime.utcnow(), self)
1010
1011        q_name = 'TestBasicGet_q' + uuid.uuid1().hex
1012
1013        # Place channel in publisher-acknowledgments mode so that the message
1014        # may be delivered synchronously to the queue by publishing it with
1015        # mandatory=True
1016        ch.confirm_delivery()
1017        LOGGER.info('%s ENABLED PUB-ACKS (%s)', datetime.utcnow(), self)
1018
1019        # Declare a new queue
1020        ch.queue_declare(q_name, auto_delete=True)
1021        self.addCleanup(lambda: self._connect().channel().queue_delete(q_name))
1022        LOGGER.info('%s DECLARED QUEUE (%s)', datetime.utcnow(), self)
1023
1024        # Verify result of getting a message from an empty queue
1025        msg = ch.basic_get(q_name, auto_ack=False)
1026        self.assertTupleEqual(msg, (None, None, None))
1027        LOGGER.info('%s GOT FROM EMPTY QUEUE (%s)', datetime.utcnow(), self)
1028
1029        body = 'TestBasicGet'
1030        # Deposit a message in the queue via default exchange
1031        ch.basic_publish(exchange='', routing_key=q_name,
1032                         body=body, mandatory=True)
1033        LOGGER.info('%s PUBLISHED (%s)', datetime.utcnow(), self)
1034
1035        # Get the message
1036        (method, properties, body) = ch.basic_get(q_name, auto_ack=False)
1037        LOGGER.info('%s GOT FROM NON-EMPTY QUEUE (%s)', datetime.utcnow(), self)
1038        self.assertIsInstance(method, pika.spec.Basic.GetOk)
1039        self.assertEqual(method.delivery_tag, 1)
1040        self.assertFalse(method.redelivered)
1041        self.assertEqual(method.exchange, '')
1042        self.assertEqual(method.routing_key, q_name)
1043        self.assertEqual(method.message_count, 0)
1044
1045        self.assertIsInstance(properties, pika.BasicProperties)
1046        self.assertIsNone(properties.headers)
1047        self.assertEqual(body, as_bytes(body))
1048
1049        # Ack it
1050        ch.basic_ack(delivery_tag=method.delivery_tag)
1051        LOGGER.info('%s ACKED (%s)', datetime.utcnow(), self)
1052
1053        # Verify that the queue is now empty
1054        self._assert_exact_message_count_with_retries(channel=ch,
1055                                                      queue=q_name,
1056                                                      expected_count=0)
1057
1058
1059class TestBasicReject(BlockingTestCaseBase):
1060
1061    def test(self):
1062        """BlockingChannel.basic_reject"""
1063        connection = self._connect()
1064
1065        ch = connection.channel()
1066
1067        q_name = 'TestBasicReject_q' + uuid.uuid1().hex
1068
1069        # Place channel in publisher-acknowledgments mode so that the message
1070        # may be delivered synchronously to the queue by publishing it with
1071        # mandatory=True
1072        ch.confirm_delivery()
1073
1074        # Declare a new queue
1075        ch.queue_declare(q_name, auto_delete=True)
1076        self.addCleanup(lambda: self._connect().channel().queue_delete(q_name))
1077
1078        # Deposit two messages in the queue via default exchange
1079        ch.basic_publish(exchange='', routing_key=q_name,
1080                         body='TestBasicReject1', mandatory=True)
1081        ch.basic_publish(exchange='', routing_key=q_name,
1082                         body='TestBasicReject2', mandatory=True)
1083
1084        # Get the messages
1085        (rx_method, _, rx_body) = ch.basic_get(q_name, auto_ack=False)
1086        self.assertEqual(rx_body, as_bytes('TestBasicReject1'))
1087
1088        (rx_method, _, rx_body) = ch.basic_get(q_name, auto_ack=False)
1089        self.assertEqual(rx_body, as_bytes('TestBasicReject2'))
1090
1091        # Nack the second message
1092        ch.basic_reject(rx_method.delivery_tag, requeue=True)
1093
1094        # Verify that exactly one message is present in the queue, namely the
1095        # second one
1096        self._assert_exact_message_count_with_retries(channel=ch,
1097                                                      queue=q_name,
1098                                                      expected_count=1)
1099        (rx_method, _, rx_body) = ch.basic_get(q_name, auto_ack=False)
1100        self.assertEqual(rx_body, as_bytes('TestBasicReject2'))
1101
1102
1103class TestBasicRejectNoRequeue(BlockingTestCaseBase):
1104
1105    def test(self):
1106        """BlockingChannel.basic_reject with requeue=False"""
1107        connection = self._connect()
1108
1109        ch = connection.channel()
1110
1111        q_name = 'TestBasicRejectNoRequeue_q' + uuid.uuid1().hex
1112
1113        # Place channel in publisher-acknowledgments mode so that the message
1114        # may be delivered synchronously to the queue by publishing it with
1115        # mandatory=True
1116        ch.confirm_delivery()
1117
1118        # Declare a new queue
1119        ch.queue_declare(q_name, auto_delete=True)
1120        self.addCleanup(lambda: self._connect().channel().queue_delete(q_name))
1121
1122        # Deposit two messages in the queue via default exchange
1123        ch.basic_publish(exchange='', routing_key=q_name,
1124                         body='TestBasicRejectNoRequeue1', mandatory=True)
1125        ch.basic_publish(exchange='', routing_key=q_name,
1126                         body='TestBasicRejectNoRequeue2', mandatory=True)
1127
1128        # Get the messages
1129        (rx_method, _, rx_body) = ch.basic_get(q_name, auto_ack=False)
1130        self.assertEqual(rx_body,
1131                         as_bytes('TestBasicRejectNoRequeue1'))
1132
1133        (rx_method, _, rx_body) = ch.basic_get(q_name, auto_ack=False)
1134        self.assertEqual(rx_body,
1135                         as_bytes('TestBasicRejectNoRequeue2'))
1136
1137        # Nack the second message
1138        ch.basic_reject(rx_method.delivery_tag, requeue=False)
1139
1140        # Verify that no messages are present in the queue
1141        self._assert_exact_message_count_with_retries(channel=ch,
1142                                                      queue=q_name,
1143                                                      expected_count=0)
1144
1145
1146class TestBasicNack(BlockingTestCaseBase):
1147
1148    def test(self):
1149        """BlockingChannel.basic_nack single message"""
1150        connection = self._connect()
1151
1152        ch = connection.channel()
1153
1154        q_name = 'TestBasicNack_q' + uuid.uuid1().hex
1155
1156        # Place channel in publisher-acknowledgments mode so that the message
1157        # may be delivered synchronously to the queue by publishing it with
1158        # mandatory=True
1159        ch.confirm_delivery()
1160
1161        # Declare a new queue
1162        ch.queue_declare(q_name, auto_delete=True)
1163        self.addCleanup(lambda: self._connect().channel().queue_delete(q_name))
1164
1165        # Deposit two messages in the queue via default exchange
1166        ch.basic_publish(exchange='', routing_key=q_name,
1167                         body='TestBasicNack1', mandatory=True)
1168        ch.basic_publish(exchange='', routing_key=q_name,
1169                         body='TestBasicNack2', mandatory=True)
1170
1171        # Get the messages
1172        (rx_method, _, rx_body) = ch.basic_get(q_name, auto_ack=False)
1173        self.assertEqual(rx_body, as_bytes('TestBasicNack1'))
1174
1175        (rx_method, _, rx_body) = ch.basic_get(q_name, auto_ack=False)
1176        self.assertEqual(rx_body, as_bytes('TestBasicNack2'))
1177
1178        # Nack the second message
1179        ch.basic_nack(rx_method.delivery_tag, multiple=False, requeue=True)
1180
1181        # Verify that exactly one message is present in the queue, namely the
1182        # second one
1183        self._assert_exact_message_count_with_retries(channel=ch,
1184                                                      queue=q_name,
1185                                                      expected_count=1)
1186        (rx_method, _, rx_body) = ch.basic_get(q_name, auto_ack=False)
1187        self.assertEqual(rx_body, as_bytes('TestBasicNack2'))
1188
1189
1190class TestBasicNackNoRequeue(BlockingTestCaseBase):
1191
1192    def test(self):
1193        """BlockingChannel.basic_nack with requeue=False"""
1194        connection = self._connect()
1195
1196        ch = connection.channel()
1197
1198        q_name = 'TestBasicNackNoRequeue_q' + uuid.uuid1().hex
1199
1200        # Place channel in publisher-acknowledgments mode so that the message
1201        # may be delivered synchronously to the queue by publishing it with
1202        # mandatory=True
1203        ch.confirm_delivery()
1204
1205        # Declare a new queue
1206        ch.queue_declare(q_name, auto_delete=True)
1207        self.addCleanup(lambda: self._connect().channel().queue_delete(q_name))
1208
1209        # Deposit two messages in the queue via default exchange
1210        ch.basic_publish(exchange='', routing_key=q_name,
1211                         body='TestBasicNackNoRequeue1', mandatory=True)
1212        ch.basic_publish(exchange='', routing_key=q_name,
1213                         body='TestBasicNackNoRequeue2', mandatory=True)
1214
1215        # Get the messages
1216        (rx_method, _, rx_body) = ch.basic_get(q_name, auto_ack=False)
1217        self.assertEqual(rx_body,
1218                         as_bytes('TestBasicNackNoRequeue1'))
1219
1220        (rx_method, _, rx_body) = ch.basic_get(q_name, auto_ack=False)
1221        self.assertEqual(rx_body,
1222                         as_bytes('TestBasicNackNoRequeue2'))
1223
1224        # Nack the second message
1225        ch.basic_nack(rx_method.delivery_tag, requeue=False)
1226
1227        # Verify that no messages are present in the queue
1228        self._assert_exact_message_count_with_retries(channel=ch,
1229                                                      queue=q_name,
1230                                                      expected_count=0)
1231
1232
1233class TestBasicNackMultiple(BlockingTestCaseBase):
1234
1235    def test(self):
1236        """BlockingChannel.basic_nack multiple messages"""
1237        connection = self._connect()
1238
1239        ch = connection.channel()
1240
1241        q_name = 'TestBasicNackMultiple_q' + uuid.uuid1().hex
1242
1243        # Place channel in publisher-acknowledgments mode so that the message
1244        # may be delivered synchronously to the queue by publishing it with
1245        # mandatory=True
1246        ch.confirm_delivery()
1247
1248        # Declare a new queue
1249        ch.queue_declare(q_name, auto_delete=True)
1250        self.addCleanup(lambda: self._connect().channel().queue_delete(q_name))
1251
1252        # Deposit two messages in the queue via default exchange
1253        ch.basic_publish(exchange='', routing_key=q_name,
1254                         body='TestBasicNackMultiple1', mandatory=True)
1255        ch.basic_publish(exchange='', routing_key=q_name,
1256                         body='TestBasicNackMultiple2', mandatory=True)
1257
1258        # Get the messages
1259        (rx_method, _, rx_body) = ch.basic_get(q_name, auto_ack=False)
1260        self.assertEqual(rx_body,
1261                         as_bytes('TestBasicNackMultiple1'))
1262
1263        (rx_method, _, rx_body) = ch.basic_get(q_name, auto_ack=False)
1264        self.assertEqual(rx_body,
1265                         as_bytes('TestBasicNackMultiple2'))
1266
1267        # Nack both messages via the "multiple" option
1268        ch.basic_nack(rx_method.delivery_tag, multiple=True, requeue=True)
1269
1270        # Verify that both messages are present in the queue
1271        self._assert_exact_message_count_with_retries(channel=ch,
1272                                                      queue=q_name,
1273                                                      expected_count=2)
1274        (rx_method, _, rx_body) = ch.basic_get(q_name, auto_ack=False)
1275        self.assertEqual(rx_body,
1276                         as_bytes('TestBasicNackMultiple1'))
1277        (rx_method, _, rx_body) = ch.basic_get(q_name, auto_ack=False)
1278        self.assertEqual(rx_body,
1279                         as_bytes('TestBasicNackMultiple2'))
1280
1281
1282class TestBasicRecoverWithRequeue(BlockingTestCaseBase):
1283
1284    def test(self):
1285        """BlockingChannel.basic_recover with requeue=True.
1286
1287        NOTE: the requeue=False option is not supported by RabbitMQ broker as
1288        of this writing (using RabbitMQ 3.5.1)
1289        """
1290        connection = self._connect()
1291
1292        ch = connection.channel()
1293
1294        q_name = (
1295            'TestBasicRecoverWithRequeue_q' + uuid.uuid1().hex)
1296
1297        # Place channel in publisher-acknowledgments mode so that the message
1298        # may be delivered synchronously to the queue by publishing it with
1299        # mandatory=True
1300        ch.confirm_delivery()
1301
1302        # Declare a new queue
1303        ch.queue_declare(q_name, auto_delete=True)
1304        self.addCleanup(lambda: self._connect().channel().queue_delete(q_name))
1305
1306        # Deposit two messages in the queue via default exchange
1307        ch.basic_publish(exchange='', routing_key=q_name,
1308                         body='TestBasicRecoverWithRequeue1', mandatory=True)
1309        ch.basic_publish(exchange='', routing_key=q_name,
1310                         body='TestBasicRecoverWithRequeue2', mandatory=True)
1311
1312        rx_messages = []
1313        num_messages = 0
1314        for msg in ch.consume(q_name, auto_ack=False):
1315            num_messages += 1
1316
1317            if num_messages == 2:
1318                ch.basic_recover(requeue=True)
1319
1320            if num_messages > 2:
1321                rx_messages.append(msg)
1322
1323            if num_messages == 4:
1324                break
1325        else:
1326            self.fail('consumer aborted prematurely')
1327
1328        # Get the messages
1329        (_, _, rx_body) = rx_messages[0]
1330        self.assertEqual(rx_body,
1331                         as_bytes('TestBasicRecoverWithRequeue1'))
1332
1333        (_, _, rx_body) = rx_messages[1]
1334        self.assertEqual(rx_body,
1335                         as_bytes('TestBasicRecoverWithRequeue2'))
1336
1337
1338class TestTxCommit(BlockingTestCaseBase):
1339
1340    def test(self):
1341        """BlockingChannel.tx_commit"""
1342        connection = self._connect()
1343
1344        ch = connection.channel()
1345
1346        q_name = 'TestTxCommit_q' + uuid.uuid1().hex
1347
1348        # Declare a new queue
1349        ch.queue_declare(q_name, auto_delete=True)
1350        self.addCleanup(lambda: self._connect().channel().queue_delete(q_name))
1351
1352        # Select standard transaction mode
1353        frame = ch.tx_select()
1354        self.assertIsInstance(frame.method, pika.spec.Tx.SelectOk)
1355
1356        # Deposit a message in the queue via default exchange
1357        ch.basic_publish(exchange='', routing_key=q_name,
1358                         body='TestTxCommit1', mandatory=True)
1359
1360        # Verify that queue is still empty
1361        frame = ch.queue_declare(q_name, passive=True)
1362        self.assertEqual(frame.method.message_count, 0)
1363
1364        # Commit the transaction
1365        ch.tx_commit()
1366
1367        # Verify that the queue has the expected message
1368        frame = ch.queue_declare(q_name, passive=True)
1369        self.assertEqual(frame.method.message_count, 1)
1370
1371        (_, _, rx_body) = ch.basic_get(q_name, auto_ack=False)
1372        self.assertEqual(rx_body, as_bytes('TestTxCommit1'))
1373
1374
1375class TestTxRollback(BlockingTestCaseBase):
1376
1377    def test(self):
1378        """BlockingChannel.tx_commit"""
1379        connection = self._connect()
1380
1381        ch = connection.channel()
1382
1383        q_name = 'TestTxRollback_q' + uuid.uuid1().hex
1384
1385        # Declare a new queue
1386        ch.queue_declare(q_name, auto_delete=True)
1387        self.addCleanup(lambda: self._connect().channel().queue_delete(q_name))
1388
1389        # Select standard transaction mode
1390        frame = ch.tx_select()
1391        self.assertIsInstance(frame.method, pika.spec.Tx.SelectOk)
1392
1393        # Deposit a message in the queue via default exchange
1394        ch.basic_publish(exchange='', routing_key=q_name,
1395                         body='TestTxRollback1', mandatory=True)
1396
1397        # Verify that queue is still empty
1398        frame = ch.queue_declare(q_name, passive=True)
1399        self.assertEqual(frame.method.message_count, 0)
1400
1401        # Roll back the transaction
1402        ch.tx_rollback()
1403
1404        # Verify that the queue continues to be empty
1405        frame = ch.queue_declare(q_name, passive=True)
1406        self.assertEqual(frame.method.message_count, 0)
1407
1408
1409class TestBasicConsumeFromUnknownQueueRaisesChannelClosed(BlockingTestCaseBase):
1410    def test(self):
1411        """ChannelClosed raised when consuming from unknown queue"""
1412        connection = self._connect()
1413        ch = connection.channel()
1414
1415        q_name = ("TestBasicConsumeFromUnknownQueueRaisesChannelClosed_q_" +
1416                  uuid.uuid1().hex)
1417
1418        with self.assertRaises(pika.exceptions.ChannelClosedByBroker) as ex_cm:
1419            ch.basic_consume(q_name, lambda *args: None)
1420
1421        self.assertEqual(ex_cm.exception.args[0], 404)
1422
1423
1424class TestPublishAndBasicPublishWithPubacksUnroutable(BlockingTestCaseBase):
1425
1426    def test(self):  # pylint: disable=R0914
1427        """BlockingChannel.publish amd basic_publish unroutable message with pubacks"""  # pylint: disable=C0301
1428        connection = self._connect()
1429
1430        ch = connection.channel()
1431
1432        exg_name = ('TestPublishAndBasicPublishUnroutable_exg_' +
1433                    uuid.uuid1().hex)
1434        routing_key = 'TestPublishAndBasicPublishUnroutable'
1435
1436        # Place channel in publisher-acknowledgments mode so that publishing
1437        # with mandatory=True will be synchronous
1438        res = ch.confirm_delivery()
1439        self.assertIsNone(res)
1440
1441        # Declare a new exchange
1442        ch.exchange_declare(exg_name, exchange_type='direct')
1443        self.addCleanup(connection.channel().exchange_delete, exg_name)
1444
1445        # Verify unroutable message handling using basic_publish
1446        msg2_headers = dict(
1447            test_name='TestPublishAndBasicPublishWithPubacksUnroutable')
1448        msg2_properties = pika.spec.BasicProperties(headers=msg2_headers)
1449        with self.assertRaises(pika.exceptions.UnroutableError) as cm:
1450            ch.basic_publish(exg_name, routing_key=routing_key, body='',
1451                             properties=msg2_properties, mandatory=True)
1452        (msg,) = cm.exception.messages
1453        self.assertIsInstance(msg, blocking_connection.ReturnedMessage)
1454        self.assertIsInstance(msg.method, pika.spec.Basic.Return)
1455        self.assertEqual(msg.method.reply_code, 312)
1456        self.assertEqual(msg.method.exchange, exg_name)
1457        self.assertEqual(msg.method.routing_key, routing_key)
1458        self.assertIsInstance(msg.properties, pika.BasicProperties)
1459        self.assertEqual(msg.properties.headers, msg2_headers)
1460        self.assertEqual(msg.body, as_bytes(''))
1461
1462
1463class TestConfirmDeliveryAfterUnroutableMessage(BlockingTestCaseBase):
1464
1465    def test(self):  # pylint: disable=R0914
1466        """BlockingChannel.confirm_delivery following unroutable message"""
1467        connection = self._connect()
1468
1469        ch = connection.channel()
1470
1471        exg_name = ('TestConfirmDeliveryAfterUnroutableMessage_exg_' +
1472                    uuid.uuid1().hex)
1473        routing_key = 'TestConfirmDeliveryAfterUnroutableMessage'
1474
1475        # Declare a new exchange
1476        ch.exchange_declare(exg_name, exchange_type='direct')
1477        self.addCleanup(connection.channel().exchange_delete, exg_name)
1478
1479        # Register on-return callback
1480        returned_messages = []
1481        ch.add_on_return_callback(lambda *args: returned_messages.append(args))
1482
1483        # Emit unroutable message without pubacks
1484        ch.basic_publish(exg_name, routing_key=routing_key, body='', mandatory=True)
1485
1486        # Select delivery confirmations
1487        ch.confirm_delivery()
1488
1489        # Verify that unroutable message is in pending events
1490        self.assertEqual(len(ch._pending_events), 1)
1491        self.assertIsInstance(ch._pending_events[0],
1492                              blocking_connection._ReturnedMessageEvt)
1493        # Verify that repr of _ReturnedMessageEvt instance does crash
1494        repr(ch._pending_events[0])
1495
1496        # Dispach events
1497        connection.process_data_events()
1498
1499        self.assertEqual(len(ch._pending_events), 0)
1500
1501        # Verify that unroutable message was dispatched
1502        ((channel, method, properties, body,),) = returned_messages  # pylint: disable=E0632
1503        self.assertIs(channel, ch)
1504        self.assertIsInstance(method, pika.spec.Basic.Return)
1505        self.assertEqual(method.reply_code, 312)
1506        self.assertEqual(method.exchange, exg_name)
1507        self.assertEqual(method.routing_key, routing_key)
1508        self.assertIsInstance(properties, pika.BasicProperties)
1509        self.assertEqual(body, as_bytes(''))
1510
1511
1512class TestUnroutableMessagesReturnedInNonPubackMode(BlockingTestCaseBase):
1513
1514    def test(self):  # pylint: disable=R0914
1515        """BlockingChannel: unroutable messages is returned in non-puback mode"""  # pylint: disable=C0301
1516        connection = self._connect()
1517
1518        ch = connection.channel()
1519
1520        exg_name = (
1521            'TestUnroutableMessageReturnedInNonPubackMode_exg_'
1522            + uuid.uuid1().hex)
1523        routing_key = 'TestUnroutableMessageReturnedInNonPubackMode'
1524
1525        # Declare a new exchange
1526        ch.exchange_declare(exg_name, exchange_type='direct')
1527        self.addCleanup(connection.channel().exchange_delete, exg_name)
1528
1529        # Register on-return callback
1530        returned_messages = []
1531        ch.add_on_return_callback(
1532            lambda *args: returned_messages.append(args))
1533
1534        # Emit unroutable messages without pubacks
1535        ch.basic_publish(exg_name, routing_key=routing_key, body='msg1', mandatory=True)
1536        ch.basic_publish(exg_name, routing_key=routing_key, body='msg2', mandatory=True)
1537
1538        # Process I/O until Basic.Return are dispatched
1539        while len(returned_messages) < 2:
1540            connection.process_data_events()
1541
1542        self.assertEqual(len(returned_messages), 2)
1543
1544        self.assertEqual(len(ch._pending_events), 0)
1545
1546        # Verify returned messages
1547        (channel, method, properties, body,) = returned_messages[0]
1548        self.assertIs(channel, ch)
1549        self.assertIsInstance(method, pika.spec.Basic.Return)
1550        self.assertEqual(method.reply_code, 312)
1551        self.assertEqual(method.exchange, exg_name)
1552        self.assertEqual(method.routing_key, routing_key)
1553        self.assertIsInstance(properties, pika.BasicProperties)
1554        self.assertEqual(body, as_bytes('msg1'))
1555
1556        (channel, method, properties, body,) = returned_messages[1]
1557        self.assertIs(channel, ch)
1558        self.assertIsInstance(method, pika.spec.Basic.Return)
1559        self.assertEqual(method.reply_code, 312)
1560        self.assertEqual(method.exchange, exg_name)
1561        self.assertEqual(method.routing_key, routing_key)
1562        self.assertIsInstance(properties, pika.BasicProperties)
1563        self.assertEqual(body, as_bytes('msg2'))
1564
1565
1566class TestUnroutableMessageReturnedInPubackMode(BlockingTestCaseBase):
1567
1568    def test(self):  # pylint: disable=R0914
1569        """BlockingChannel: unroutable messages is returned in puback mode"""
1570        connection = self._connect()
1571
1572        ch = connection.channel()
1573
1574        exg_name = (
1575            'TestUnroutableMessageReturnedInPubackMode_exg_'
1576            + uuid.uuid1().hex)
1577        routing_key = 'TestUnroutableMessageReturnedInPubackMode'
1578
1579        # Declare a new exchange
1580        ch.exchange_declare(exg_name, exchange_type='direct')
1581        self.addCleanup(connection.channel().exchange_delete, exg_name)
1582
1583        # Select delivery confirmations
1584        ch.confirm_delivery()
1585
1586        # Register on-return callback
1587        returned_messages = []
1588        ch.add_on_return_callback(
1589            lambda *args: returned_messages.append(args))
1590
1591        # Emit unroutable messages with pubacks
1592        with self.assertRaises(pika.exceptions.UnroutableError):
1593            ch.basic_publish(exg_name, routing_key=routing_key, body='msg1',
1594                             mandatory=True)
1595        with self.assertRaises(pika.exceptions.UnroutableError):
1596            ch.basic_publish(exg_name, routing_key=routing_key, body='msg2',
1597                             mandatory=True)
1598
1599        # Verify that unroutable messages are already in pending events
1600        self.assertEqual(len(ch._pending_events), 2)
1601        self.assertIsInstance(ch._pending_events[0],
1602                              blocking_connection._ReturnedMessageEvt)
1603        self.assertIsInstance(ch._pending_events[1],
1604                              blocking_connection._ReturnedMessageEvt)
1605        # Verify that repr of _ReturnedMessageEvt instance does crash
1606        repr(ch._pending_events[0])
1607        repr(ch._pending_events[1])
1608
1609        # Dispatch events
1610        connection.process_data_events()
1611
1612        self.assertEqual(len(ch._pending_events), 0)
1613
1614        # Verify returned messages
1615        (channel, method, properties, body,) = returned_messages[0]
1616        self.assertIs(channel, ch)
1617        self.assertIsInstance(method, pika.spec.Basic.Return)
1618        self.assertEqual(method.reply_code, 312)
1619        self.assertEqual(method.exchange, exg_name)
1620        self.assertEqual(method.routing_key, routing_key)
1621        self.assertIsInstance(properties, pika.BasicProperties)
1622        self.assertEqual(body, as_bytes('msg1'))
1623
1624        (channel, method, properties, body,) = returned_messages[1]
1625        self.assertIs(channel, ch)
1626        self.assertIsInstance(method, pika.spec.Basic.Return)
1627        self.assertEqual(method.reply_code, 312)
1628        self.assertEqual(method.exchange, exg_name)
1629        self.assertEqual(method.routing_key, routing_key)
1630        self.assertIsInstance(properties, pika.BasicProperties)
1631        self.assertEqual(body, as_bytes('msg2'))
1632
1633
1634class TestBasicPublishDeliveredWhenPendingUnroutable(BlockingTestCaseBase):
1635
1636    def test(self):  # pylint: disable=R0914
1637        """BlockingChannel.basic_publish msg delivered despite pending unroutable message"""  # pylint: disable=C0301
1638        connection = self._connect()
1639
1640        ch = connection.channel()
1641
1642        q_name = ('TestBasicPublishDeliveredWhenPendingUnroutable_q' +
1643                  uuid.uuid1().hex)
1644        exg_name = ('TestBasicPublishDeliveredWhenPendingUnroutable_exg_' +
1645                    uuid.uuid1().hex)
1646        routing_key = 'TestBasicPublishDeliveredWhenPendingUnroutable'
1647
1648        # Declare a new exchange
1649        ch.exchange_declare(exg_name, exchange_type='direct')
1650        self.addCleanup(connection.channel().exchange_delete, exg_name)
1651
1652        # Declare a new queue
1653        ch.queue_declare(q_name, auto_delete=True)
1654        self.addCleanup(lambda: self._connect().channel().queue_delete(q_name))
1655
1656        # Bind the queue to the exchange using routing key
1657        ch.queue_bind(q_name, exchange=exg_name, routing_key=routing_key)
1658
1659        # Attempt to send an unroutable message in the queue via basic_publish
1660        ch.basic_publish(exg_name, routing_key='',
1661                         body='unroutable-message',
1662                         mandatory=True)
1663
1664        # Flush connection to force Basic.Return
1665        connection.channel().close()
1666
1667        # Deposit a routable message in the queue
1668        ch.basic_publish(exg_name, routing_key=routing_key,
1669                         body='routable-message',
1670                         mandatory=True)
1671
1672        # Wait for the queue to get the routable message
1673        self._assert_exact_message_count_with_retries(channel=ch,
1674                                                      queue=q_name,
1675                                                      expected_count=1)
1676
1677        msg = ch.basic_get(q_name)
1678
1679        # Check the first message
1680        self.assertIsInstance(msg, tuple)
1681        rx_method, rx_properties, rx_body = msg
1682        self.assertIsInstance(rx_method, pika.spec.Basic.GetOk)
1683        self.assertEqual(rx_method.delivery_tag, 1)
1684        self.assertFalse(rx_method.redelivered)
1685        self.assertEqual(rx_method.exchange, exg_name)
1686        self.assertEqual(rx_method.routing_key, routing_key)
1687
1688        self.assertIsInstance(rx_properties, pika.BasicProperties)
1689        self.assertEqual(rx_body, as_bytes('routable-message'))
1690
1691        # There shouldn't be any more events now
1692        self.assertFalse(ch._pending_events)
1693
1694        # Ack the message
1695        ch.basic_ack(delivery_tag=rx_method.delivery_tag, multiple=False)
1696
1697        # Verify that the queue is now empty
1698        self._assert_exact_message_count_with_retries(channel=ch,
1699                                                      queue=q_name,
1700                                                      expected_count=0)
1701
1702
1703class TestPublishAndConsumeWithPubacksAndQosOfOne(BlockingTestCaseBase):
1704
1705    def test(self):  # pylint: disable=R0914,R0915
1706        """BlockingChannel.basic_publish, publish, basic_consume, QoS, \
1707        Basic.Cancel from broker
1708        """
1709        connection = self._connect()
1710
1711        ch = connection.channel()
1712
1713        q_name = 'TestPublishAndConsumeAndQos_q' + uuid.uuid1().hex
1714        exg_name = 'TestPublishAndConsumeAndQos_exg_' + uuid.uuid1().hex
1715        routing_key = 'TestPublishAndConsumeAndQos'
1716
1717        # Place channel in publisher-acknowledgments mode so that publishing
1718        # with mandatory=True will be synchronous
1719        res = ch.confirm_delivery()
1720        self.assertIsNone(res)
1721
1722        # Declare a new exchange
1723        ch.exchange_declare(exg_name, exchange_type='direct')
1724        self.addCleanup(connection.channel().exchange_delete, exg_name)
1725
1726        # Declare a new queue
1727        ch.queue_declare(q_name, auto_delete=True)
1728        self.addCleanup(lambda: self._connect().channel().queue_delete(q_name))
1729
1730        # Bind the queue to the exchange using routing key
1731        ch.queue_bind(q_name, exchange=exg_name, routing_key=routing_key)
1732
1733        # Deposit a message in the queue
1734        msg1_headers = dict(
1735            test_name='TestPublishAndConsumeWithPubacksAndQosOfOne')
1736        msg1_properties = pika.spec.BasicProperties(headers=msg1_headers)
1737        ch.basic_publish(exg_name, routing_key=routing_key,
1738                         body='via-basic_publish',
1739                         properties=msg1_properties,
1740                         mandatory=True)
1741
1742        # Deposit another message in the queue
1743        ch.basic_publish(exg_name, routing_key, body='via-publish',
1744                         mandatory=True)
1745
1746        # Check that the queue now has two messages
1747        frame = ch.queue_declare(q_name, passive=True)
1748        self.assertEqual(frame.method.message_count, 2)
1749
1750        # Configure QoS for one message
1751        ch.basic_qos(prefetch_size=0, prefetch_count=1, global_qos=False)
1752
1753        # Create a consumer
1754        rx_messages = []
1755        consumer_tag = ch.basic_consume(
1756            q_name,
1757            lambda *args: rx_messages.append(args),
1758            auto_ack=False,
1759            exclusive=False,
1760            arguments=None)
1761
1762        # Wait for first message to arrive
1763        while not rx_messages:
1764            connection.process_data_events(time_limit=None)
1765
1766        self.assertEqual(len(rx_messages), 1)
1767
1768        # Check the first message
1769        msg = rx_messages[0]
1770        self.assertIsInstance(msg, tuple)
1771        rx_ch, rx_method, rx_properties, rx_body = msg
1772        self.assertIs(rx_ch, ch)
1773        self.assertIsInstance(rx_method, pika.spec.Basic.Deliver)
1774        self.assertEqual(rx_method.consumer_tag, consumer_tag)
1775        self.assertEqual(rx_method.delivery_tag, 1)
1776        self.assertFalse(rx_method.redelivered)
1777        self.assertEqual(rx_method.exchange, exg_name)
1778        self.assertEqual(rx_method.routing_key, routing_key)
1779
1780        self.assertIsInstance(rx_properties, pika.BasicProperties)
1781        self.assertEqual(rx_properties.headers, msg1_headers)
1782        self.assertEqual(rx_body, as_bytes('via-basic_publish'))
1783
1784        # There shouldn't be any more events now
1785        self.assertFalse(ch._pending_events)
1786
1787        # Ack the message so that the next one can arrive (we configured QoS
1788        # with prefetch_count=1)
1789        ch.basic_ack(delivery_tag=rx_method.delivery_tag, multiple=False)
1790
1791        # Get the second message
1792        while len(rx_messages) < 2:
1793            connection.process_data_events(time_limit=None)
1794
1795        self.assertEqual(len(rx_messages), 2)
1796
1797        msg = rx_messages[1]
1798        self.assertIsInstance(msg, tuple)
1799        rx_ch, rx_method, rx_properties, rx_body = msg
1800        self.assertIs(rx_ch, ch)
1801        self.assertIsInstance(rx_method, pika.spec.Basic.Deliver)
1802        self.assertEqual(rx_method.consumer_tag, consumer_tag)
1803        self.assertEqual(rx_method.delivery_tag, 2)
1804        self.assertFalse(rx_method.redelivered)
1805        self.assertEqual(rx_method.exchange, exg_name)
1806        self.assertEqual(rx_method.routing_key, routing_key)
1807
1808        self.assertIsInstance(rx_properties, pika.BasicProperties)
1809        self.assertEqual(rx_body, as_bytes('via-publish'))
1810
1811        # There shouldn't be any more events now
1812        self.assertFalse(ch._pending_events)
1813
1814        ch.basic_ack(delivery_tag=rx_method.delivery_tag, multiple=False)
1815
1816        # Verify that the queue is now empty
1817        self._assert_exact_message_count_with_retries(channel=ch,
1818                                                      queue=q_name,
1819                                                      expected_count=0)
1820
1821        # Attempt to consume again with a short timeout
1822        connection.process_data_events(time_limit=0.005)
1823        self.assertEqual(len(rx_messages), 2)
1824
1825        # Delete the queue and wait for consumer cancellation
1826        rx_cancellations = []
1827        ch.add_on_cancel_callback(rx_cancellations.append)
1828        ch.queue_delete(q_name)
1829        ch.start_consuming()
1830
1831        self.assertEqual(len(rx_cancellations), 1)
1832        frame, = rx_cancellations  # pylint: disable=E0632
1833        self.assertEqual(frame.method.consumer_tag, consumer_tag)
1834
1835
1836class TestBasicConsumeWithAckFromAnotherThread(BlockingTestCaseBase):
1837
1838    def test(self):  # pylint: disable=R0914,R0915
1839        """BlockingChannel.basic_consume with ack from another thread and \
1840        requesting basic_ack via add_callback_threadsafe
1841        """
1842        # This test simulates processing of a message on another thread and
1843        # then requesting an ACK to be dispatched on the connection's thread
1844        # via BlockingConnection.add_callback_threadsafe
1845
1846        connection = self._connect()
1847
1848        ch = connection.channel()
1849
1850        q_name = 'TestBasicConsumeWithAckFromAnotherThread_q' + uuid.uuid1().hex
1851        exg_name = ('TestBasicConsumeWithAckFromAnotherThread_exg' +
1852                    uuid.uuid1().hex)
1853        routing_key = 'TestBasicConsumeWithAckFromAnotherThread'
1854
1855        # Place channel in publisher-acknowledgments mode so that publishing
1856        # with mandatory=True will be synchronous (for convenience)
1857        res = ch.confirm_delivery()
1858        self.assertIsNone(res)
1859
1860        # Declare a new exchange
1861        ch.exchange_declare(exg_name, exchange_type='direct')
1862        self.addCleanup(connection.channel().exchange_delete, exg_name)
1863
1864        # Declare a new queue
1865        ch.queue_declare(q_name, auto_delete=True)
1866        self.addCleanup(lambda: self._connect().channel().queue_delete(q_name))
1867
1868        # Bind the queue to the exchange using routing key
1869        ch.queue_bind(q_name, exchange=exg_name, routing_key=routing_key)
1870
1871        # Publish 2 messages with mandatory=True for synchronous processing
1872        ch.basic_publish(exg_name, routing_key, body='msg1', mandatory=True)
1873        ch.basic_publish(exg_name, routing_key, body='last-msg', mandatory=True)
1874
1875        # Configure QoS for one message so that the 2nd message will be
1876        # delivered only after the 1st one is ACKed
1877        ch.basic_qos(prefetch_size=0, prefetch_count=1, global_qos=False)
1878
1879        # Create a consumer
1880        rx_messages = []
1881        def ackAndEnqueueMessageViaAnotherThread(rx_ch,
1882                                                 rx_method,
1883                                                 rx_properties, # pylint: disable=W0613
1884                                                 rx_body):
1885            LOGGER.debug(
1886                '%s: Got message body=%r; delivery-tag=%r',
1887                datetime.now(), rx_body, rx_method.delivery_tag)
1888
1889            # Request ACK dispatch via add_callback_threadsafe from other
1890            # thread; if last message, cancel consumer so that start_consuming
1891            # can return
1892
1893            def processOnConnectionThread():
1894                LOGGER.debug('%s: ACKing message body=%r; delivery-tag=%r',
1895                             datetime.now(),
1896                             rx_body,
1897                             rx_method.delivery_tag)
1898                ch.basic_ack(delivery_tag=rx_method.delivery_tag,
1899                             multiple=False)
1900                rx_messages.append(rx_body)
1901
1902                # NOTE on python3, `b'last-msg' != 'last-msg'`
1903                if rx_body == b'last-msg':
1904                    LOGGER.debug('%s: Canceling consumer consumer-tag=%r',
1905                                 datetime.now(),
1906                                 rx_method.consumer_tag)
1907                    rx_ch.basic_cancel(rx_method.consumer_tag)
1908
1909            # Spawn a thread to initiate ACKing
1910            timer = threading.Timer(0,
1911                                    lambda: connection.add_callback_threadsafe(
1912                                        processOnConnectionThread))
1913            self.addCleanup(timer.cancel)
1914            timer.start()
1915
1916        consumer_tag = ch.basic_consume(
1917            q_name,
1918            ackAndEnqueueMessageViaAnotherThread,
1919            auto_ack=False,
1920            exclusive=False,
1921            arguments=None)
1922
1923        # Wait for both messages
1924        LOGGER.debug('%s: calling start_consuming(); consumer tag=%r',
1925                     datetime.now(),
1926                     consumer_tag)
1927        ch.start_consuming()
1928        LOGGER.debug('%s: Returned from start_consuming(); consumer tag=%r',
1929                     datetime.now(),
1930                     consumer_tag)
1931
1932        self.assertEqual(len(rx_messages), 2)
1933        self.assertEqual(rx_messages[0], b'msg1')
1934        self.assertEqual(rx_messages[1], b'last-msg')
1935
1936
1937class TestConsumeGeneratorWithAckFromAnotherThread(BlockingTestCaseBase):
1938
1939    def test(self):  # pylint: disable=R0914,R0915
1940        """BlockingChannel.consume and requesting basic_ack from another \
1941        thread via add_callback_threadsafe
1942        """
1943        connection = self._connect()
1944
1945        ch = connection.channel()
1946
1947        q_name = ('TestConsumeGeneratorWithAckFromAnotherThread_q' +
1948                  uuid.uuid1().hex)
1949        exg_name = ('TestConsumeGeneratorWithAckFromAnotherThread_exg' +
1950                    uuid.uuid1().hex)
1951        routing_key = 'TestConsumeGeneratorWithAckFromAnotherThread'
1952
1953        # Place channel in publisher-acknowledgments mode so that publishing
1954        # with mandatory=True will be synchronous (for convenience)
1955        res = ch.confirm_delivery()
1956        self.assertIsNone(res)
1957
1958        # Declare a new exchange
1959        ch.exchange_declare(exg_name, exchange_type='direct')
1960        self.addCleanup(connection.channel().exchange_delete, exg_name)
1961
1962        # Declare a new queue
1963        ch.queue_declare(q_name, auto_delete=True)
1964        self.addCleanup(lambda: self._connect().channel().queue_delete(q_name))
1965
1966        # Bind the queue to the exchange using routing key
1967        ch.queue_bind(q_name, exchange=exg_name, routing_key=routing_key)
1968
1969        # Publish 2 messages with mandatory=True for synchronous processing
1970        ch.basic_publish(exg_name, routing_key, body='msg1', mandatory=True)
1971        ch.basic_publish(exg_name, routing_key, body='last-msg', mandatory=True)
1972
1973        # Configure QoS for one message so that the 2nd message will be
1974        # delivered only after the 1st one is ACKed
1975        ch.basic_qos(prefetch_size=0, prefetch_count=1, global_qos=False)
1976
1977        # Create a consumer
1978        rx_messages = []
1979        def ackAndEnqueueMessageViaAnotherThread(rx_ch,
1980                                                 rx_method,
1981                                                 rx_properties, # pylint: disable=W0613
1982                                                 rx_body):
1983            LOGGER.debug(
1984                '%s: Got message body=%r; delivery-tag=%r',
1985                datetime.now(), rx_body, rx_method.delivery_tag)
1986
1987            # Request ACK dispatch via add_callback_threadsafe from other
1988            # thread; if last message, cancel consumer so that consumer
1989            # generator completes
1990
1991            def processOnConnectionThread():
1992                LOGGER.debug('%s: ACKing message body=%r; delivery-tag=%r',
1993                             datetime.now(),
1994                             rx_body,
1995                             rx_method.delivery_tag)
1996                ch.basic_ack(delivery_tag=rx_method.delivery_tag,
1997                             multiple=False)
1998                rx_messages.append(rx_body)
1999
2000                # NOTE on python3, `b'last-msg' != 'last-msg'`
2001                if rx_body == b'last-msg':
2002                    LOGGER.debug('%s: Canceling consumer consumer-tag=%r',
2003                                 datetime.now(),
2004                                 rx_method.consumer_tag)
2005                    # NOTE Need to use cancel() for the consumer generator
2006                    # instead of basic_cancel()
2007                    rx_ch.cancel()
2008
2009            # Spawn a thread to initiate ACKing
2010            timer = threading.Timer(0,
2011                                    lambda: connection.add_callback_threadsafe(
2012                                        processOnConnectionThread))
2013            self.addCleanup(timer.cancel)
2014            timer.start()
2015
2016        for method, properties, body in ch.consume(q_name, auto_ack=False):
2017            ackAndEnqueueMessageViaAnotherThread(rx_ch=ch,
2018                                                 rx_method=method,
2019                                                 rx_properties=properties,
2020                                                 rx_body=body)
2021
2022        self.assertEqual(len(rx_messages), 2)
2023        self.assertEqual(rx_messages[0], b'msg1')
2024        self.assertEqual(rx_messages[1], b'last-msg')
2025
2026
2027class TestTwoBasicConsumersOnSameChannel(BlockingTestCaseBase):
2028
2029    def test(self):  # pylint: disable=R0914
2030        """BlockingChannel: two basic_consume consumers on same channel
2031        """
2032        connection = self._connect()
2033
2034        ch = connection.channel()
2035
2036        exg_name = 'TestPublishAndConsumeAndQos_exg_' + uuid.uuid1().hex
2037        q1_name = 'TestTwoBasicConsumersOnSameChannel_q1' + uuid.uuid1().hex
2038        q2_name = 'TestTwoBasicConsumersOnSameChannel_q2' + uuid.uuid1().hex
2039        q1_routing_key = 'TestTwoBasicConsumersOnSameChannel1'
2040        q2_routing_key = 'TestTwoBasicConsumersOnSameChannel2'
2041
2042        # Place channel in publisher-acknowledgments mode so that publishing
2043        # with mandatory=True will be synchronous
2044        ch.confirm_delivery()
2045
2046        # Declare a new exchange
2047        ch.exchange_declare(exg_name, exchange_type='direct')
2048        self.addCleanup(connection.channel().exchange_delete, exg_name)
2049
2050        # Declare the two new queues and bind them to the exchange
2051        ch.queue_declare(q1_name, auto_delete=True)
2052        self.addCleanup(lambda: self._connect().channel().queue_delete(q1_name))
2053        ch.queue_bind(q1_name, exchange=exg_name, routing_key=q1_routing_key)
2054
2055        ch.queue_declare(q2_name, auto_delete=True)
2056        self.addCleanup(lambda: self._connect().channel().queue_delete(q2_name))
2057        ch.queue_bind(q2_name, exchange=exg_name, routing_key=q2_routing_key)
2058
2059        # Deposit messages in the queues
2060        q1_tx_message_bodies = ['q1_message+%s' % (i,)
2061                                for i in pika.compat.xrange(100)]
2062        for message_body in q1_tx_message_bodies:
2063            ch.basic_publish(exg_name, q1_routing_key, body=message_body, mandatory=True)
2064
2065        q2_tx_message_bodies = ['q2_message+%s' % (i,)
2066                                for i in pika.compat.xrange(150)]
2067        for message_body in q2_tx_message_bodies:
2068            ch.basic_publish(exg_name, q2_routing_key, body=message_body, mandatory=True)
2069
2070        # Create the consumers
2071        q1_rx_messages = []
2072        q1_consumer_tag = ch.basic_consume(
2073            q1_name,
2074            lambda *args: q1_rx_messages.append(args),
2075            auto_ack=False,
2076            exclusive=False,
2077            arguments=None)
2078
2079        q2_rx_messages = []
2080        q2_consumer_tag = ch.basic_consume(
2081            q2_name,
2082            lambda *args: q2_rx_messages.append(args),
2083            auto_ack=False,
2084            exclusive=False,
2085            arguments=None)
2086
2087        # Wait for all messages to be delivered
2088        while (len(q1_rx_messages) < len(q1_tx_message_bodies) or
2089               len(q2_rx_messages) < len(q2_tx_message_bodies)):
2090            connection.process_data_events(time_limit=None)
2091
2092        self.assertEqual(len(q2_rx_messages), len(q2_tx_message_bodies))
2093
2094        # Verify the messages
2095        def validate_messages(rx_messages,
2096                              routing_key,
2097                              consumer_tag,
2098                              tx_message_bodies):
2099            self.assertEqual(len(rx_messages), len(tx_message_bodies))
2100
2101            for msg, expected_body in zip(rx_messages, tx_message_bodies):
2102                self.assertIsInstance(msg, tuple)
2103                rx_ch, rx_method, rx_properties, rx_body = msg
2104                self.assertIs(rx_ch, ch)
2105                self.assertIsInstance(rx_method, pika.spec.Basic.Deliver)
2106                self.assertEqual(rx_method.consumer_tag, consumer_tag)
2107                self.assertFalse(rx_method.redelivered)
2108                self.assertEqual(rx_method.exchange, exg_name)
2109                self.assertEqual(rx_method.routing_key, routing_key)
2110
2111                self.assertIsInstance(rx_properties, pika.BasicProperties)
2112                self.assertEqual(rx_body, as_bytes(expected_body))
2113
2114        # Validate q1 consumed messages
2115        validate_messages(rx_messages=q1_rx_messages,
2116                          routing_key=q1_routing_key,
2117                          consumer_tag=q1_consumer_tag,
2118                          tx_message_bodies=q1_tx_message_bodies)
2119
2120        # Validate q2 consumed messages
2121        validate_messages(rx_messages=q2_rx_messages,
2122                          routing_key=q2_routing_key,
2123                          consumer_tag=q2_consumer_tag,
2124                          tx_message_bodies=q2_tx_message_bodies)
2125
2126        # There shouldn't be any more events now
2127        self.assertFalse(ch._pending_events)
2128
2129
2130class TestBasicCancelPurgesPendingConsumerCancellationEvt(BlockingTestCaseBase):
2131
2132    def test(self):
2133        """BlockingChannel.basic_cancel purges pending _ConsumerCancellationEvt""" # pylint: disable=C0301
2134        connection = self._connect()
2135
2136        ch = connection.channel()
2137
2138        q_name = ('TestBasicCancelPurgesPendingConsumerCancellationEvt_q' +
2139                  uuid.uuid1().hex)
2140
2141        ch.queue_declare(q_name)
2142
2143        self.addCleanup(lambda: self._connect().channel().queue_delete(q_name))
2144
2145        ch.basic_publish('', routing_key=q_name, body='via-publish', mandatory=True)
2146
2147        # Create a consumer. Not passing a 'callback' to test client-generated
2148        # consumer tags
2149        rx_messages = []
2150        consumer_tag = ch.basic_consume(
2151            q_name,
2152            lambda *args: rx_messages.append(args))
2153
2154        # Wait for the published message to arrive, but don't consume it
2155        while not ch._pending_events:
2156            # Issue synchronous command that forces processing of incoming I/O
2157            connection.channel().close()
2158
2159        self.assertEqual(len(ch._pending_events), 1)
2160        self.assertIsInstance(ch._pending_events[0],
2161                              blocking_connection._ConsumerDeliveryEvt)
2162
2163        # Delete the queue and wait for broker-initiated consumer cancellation
2164        ch.queue_delete(q_name)
2165        while len(ch._pending_events) < 2:
2166            # Issue synchronous command that forces processing of incoming I/O
2167            connection.channel().close()
2168
2169        self.assertEqual(len(ch._pending_events), 2)
2170        self.assertIsInstance(ch._pending_events[1],
2171                              blocking_connection._ConsumerCancellationEvt)
2172
2173        # Issue consumer cancellation and verify that the pending
2174        # _ConsumerCancellationEvt instance was removed
2175        messages = ch.basic_cancel(consumer_tag)
2176        self.assertEqual(messages, [])
2177
2178        self.assertEqual(len(ch._pending_events), 0)
2179
2180
2181class TestBasicPublishWithoutPubacks(BlockingTestCaseBase):
2182
2183    def test(self):  # pylint: disable=R0914,R0915
2184        """BlockingChannel.basic_publish without pubacks"""
2185        connection = self._connect()
2186
2187        ch = connection.channel()
2188
2189        q_name = 'TestBasicPublishWithoutPubacks_q' + uuid.uuid1().hex
2190        exg_name = 'TestBasicPublishWithoutPubacks_exg_' + uuid.uuid1().hex
2191        routing_key = 'TestBasicPublishWithoutPubacks'
2192
2193        # Declare a new exchange
2194        ch.exchange_declare(exg_name, exchange_type='direct')
2195        self.addCleanup(connection.channel().exchange_delete, exg_name)
2196
2197        # Declare a new queue
2198        ch.queue_declare(q_name, auto_delete=True)
2199        self.addCleanup(lambda: self._connect().channel().queue_delete(q_name))
2200
2201        # Bind the queue to the exchange using routing key
2202        ch.queue_bind(q_name, exchange=exg_name, routing_key=routing_key)
2203
2204        # Deposit a message in the queue with mandatory=True
2205        msg1_headers = dict(
2206            test_name='TestBasicPublishWithoutPubacks')
2207        msg1_properties = pika.spec.BasicProperties(headers=msg1_headers)
2208        ch.basic_publish(exg_name, routing_key=routing_key,
2209                         body='via-basic_publish_mandatory=True',
2210                         properties=msg1_properties,
2211                         mandatory=True)
2212
2213        # Deposit a message in the queue with mandatory=False
2214        ch.basic_publish(exg_name, routing_key=routing_key,
2215                         body='via-basic_publish_mandatory=False',
2216                         mandatory=False)
2217
2218        # Wait for the messages to arrive in queue
2219        self._assert_exact_message_count_with_retries(channel=ch,
2220                                                      queue=q_name,
2221                                                      expected_count=2)
2222
2223        # Create a consumer. Not passing a 'callback' to test client-generated
2224        # consumer tags
2225        rx_messages = []
2226        consumer_tag = ch.basic_consume(
2227            q_name,
2228            lambda *args: rx_messages.append(args),
2229            auto_ack=False,
2230            exclusive=False,
2231            arguments=None)
2232
2233        # Wait for first message to arrive
2234        while not rx_messages:
2235            connection.process_data_events(time_limit=None)
2236
2237        self.assertGreaterEqual(len(rx_messages), 1)
2238
2239        # Check the first message
2240        msg = rx_messages[0]
2241        self.assertIsInstance(msg, tuple)
2242        rx_ch, rx_method, rx_properties, rx_body = msg
2243        self.assertIs(rx_ch, ch)
2244        self.assertIsInstance(rx_method, pika.spec.Basic.Deliver)
2245        self.assertEqual(rx_method.consumer_tag, consumer_tag)
2246        self.assertEqual(rx_method.delivery_tag, 1)
2247        self.assertFalse(rx_method.redelivered)
2248        self.assertEqual(rx_method.exchange, exg_name)
2249        self.assertEqual(rx_method.routing_key, routing_key)
2250
2251        self.assertIsInstance(rx_properties, pika.BasicProperties)
2252        self.assertEqual(rx_properties.headers, msg1_headers)
2253        self.assertEqual(rx_body, as_bytes('via-basic_publish_mandatory=True'))
2254
2255        # There shouldn't be any more events now
2256        self.assertFalse(ch._pending_events)
2257
2258        # Ack the message so that the next one can arrive (we configured QoS
2259        # with prefetch_count=1)
2260        ch.basic_ack(delivery_tag=rx_method.delivery_tag, multiple=False)
2261
2262        # Get the second message
2263        while len(rx_messages) < 2:
2264            connection.process_data_events(time_limit=None)
2265
2266        self.assertEqual(len(rx_messages), 2)
2267
2268        msg = rx_messages[1]
2269        self.assertIsInstance(msg, tuple)
2270        rx_ch, rx_method, rx_properties, rx_body = msg
2271        self.assertIs(rx_ch, ch)
2272        self.assertIsInstance(rx_method, pika.spec.Basic.Deliver)
2273        self.assertEqual(rx_method.consumer_tag, consumer_tag)
2274        self.assertEqual(rx_method.delivery_tag, 2)
2275        self.assertFalse(rx_method.redelivered)
2276        self.assertEqual(rx_method.exchange, exg_name)
2277        self.assertEqual(rx_method.routing_key, routing_key)
2278
2279        self.assertIsInstance(rx_properties, pika.BasicProperties)
2280        self.assertEqual(rx_body, as_bytes('via-basic_publish_mandatory=False'))
2281
2282        # There shouldn't be any more events now
2283        self.assertFalse(ch._pending_events)
2284
2285        ch.basic_ack(delivery_tag=rx_method.delivery_tag, multiple=False)
2286
2287        # Verify that the queue is now empty
2288        self._assert_exact_message_count_with_retries(channel=ch,
2289                                                      queue=q_name,
2290                                                      expected_count=0)
2291
2292        # Attempt to consume again with a short timeout
2293        connection.process_data_events(time_limit=0.005)
2294        self.assertEqual(len(rx_messages), 2)
2295
2296
2297class TestPublishFromBasicConsumeCallback(BlockingTestCaseBase):
2298
2299    def test(self):
2300        """BlockingChannel.basic_publish from basic_consume callback
2301        """
2302        connection = self._connect()
2303
2304        ch = connection.channel()
2305
2306        src_q_name = (
2307            'TestPublishFromBasicConsumeCallback_src_q' + uuid.uuid1().hex)
2308        dest_q_name = (
2309            'TestPublishFromBasicConsumeCallback_dest_q' + uuid.uuid1().hex)
2310
2311        # Place channel in publisher-acknowledgments mode so that publishing
2312        # with mandatory=True will be synchronous
2313        ch.confirm_delivery()
2314
2315        # Declare source and destination queues
2316        ch.queue_declare(src_q_name, auto_delete=True)
2317        self.addCleanup(lambda: self._connect().channel().queue_delete(src_q_name))
2318        ch.queue_declare(dest_q_name, auto_delete=True)
2319        self.addCleanup(lambda: self._connect().channel().queue_delete(dest_q_name))
2320
2321        # Deposit a message in the source queue
2322        ch.basic_publish('',
2323                         routing_key=src_q_name,
2324                         body='via-publish',
2325                         mandatory=True)
2326
2327        # Create a consumer
2328        def on_consume(channel, method, props, body):
2329            channel.basic_publish(
2330                '', routing_key=dest_q_name, body=body,
2331                properties=props, mandatory=True)
2332            channel.basic_ack(method.delivery_tag)
2333
2334        ch.basic_consume(src_q_name,
2335                         on_consume,
2336                         auto_ack=False,
2337                         exclusive=False,
2338                         arguments=None)
2339
2340        # Consume from destination queue
2341        for _, _, rx_body in ch.consume(dest_q_name, auto_ack=True):
2342            self.assertEqual(rx_body, as_bytes('via-publish'))
2343            break
2344        else:
2345            self.fail('failed to consume a messages from destination q')
2346
2347
2348class TestStopConsumingFromBasicConsumeCallback(BlockingTestCaseBase):
2349
2350    def test(self):
2351        """BlockingChannel.stop_consuming from basic_consume callback
2352        """
2353        connection = self._connect()
2354
2355        ch = connection.channel()
2356
2357        q_name = (
2358            'TestStopConsumingFromBasicConsumeCallback_q' + uuid.uuid1().hex)
2359
2360        # Place channel in publisher-acknowledgments mode so that publishing
2361        # with mandatory=True will be synchronous
2362        ch.confirm_delivery()
2363
2364        # Declare the queue
2365        ch.queue_declare(q_name, auto_delete=False)
2366        self.addCleanup(connection.channel().queue_delete, q_name)
2367
2368        # Deposit two messages in the queue
2369        ch.basic_publish('',
2370                         routing_key=q_name,
2371                         body='via-publish1',
2372                         mandatory=True)
2373
2374        ch.basic_publish('',
2375                         routing_key=q_name,
2376                         body='via-publish2',
2377                         mandatory=True)
2378
2379        # Create a consumer
2380        def on_consume(channel, method, props, body):  # pylint: disable=W0613
2381            channel.stop_consuming()
2382            channel.basic_ack(method.delivery_tag)
2383
2384        ch.basic_consume(q_name,
2385                         on_consume,
2386                         auto_ack=False,
2387                         exclusive=False,
2388                         arguments=None)
2389
2390        ch.start_consuming()
2391
2392        ch.close()
2393
2394        ch = connection.channel()
2395
2396        # Verify that only the second message is present in the queue
2397        _, _, rx_body = ch.basic_get(q_name)
2398        self.assertEqual(rx_body, as_bytes('via-publish2'))
2399
2400        msg = ch.basic_get(q_name)
2401        self.assertTupleEqual(msg, (None, None, None))
2402
2403
2404class TestCloseChannelFromBasicConsumeCallback(BlockingTestCaseBase):
2405
2406    def test(self):
2407        """BlockingChannel.close from basic_consume callback
2408        """
2409        connection = self._connect()
2410
2411        ch = connection.channel()
2412
2413        q_name = (
2414            'TestCloseChannelFromBasicConsumeCallback_q' + uuid.uuid1().hex)
2415
2416        # Place channel in publisher-acknowledgments mode so that publishing
2417        # with mandatory=True will be synchronous
2418        ch.confirm_delivery()
2419
2420        # Declare the queue
2421        ch.queue_declare(q_name, auto_delete=False)
2422        self.addCleanup(connection.channel().queue_delete, q_name)
2423
2424        # Deposit two messages in the queue
2425        ch.basic_publish('',
2426                         routing_key=q_name,
2427                         body='via-publish1',
2428                         mandatory=True)
2429
2430        ch.basic_publish('',
2431                         routing_key=q_name,
2432                         body='via-publish2',
2433                         mandatory=True)
2434
2435        # Create a consumer
2436        def on_consume(channel, method, props, body):  # pylint: disable=W0613
2437            channel.close()
2438
2439        ch.basic_consume(q_name,
2440                         on_consume,
2441                         auto_ack=False,
2442                         exclusive=False,
2443                         arguments=None)
2444
2445        ch.start_consuming()
2446
2447        self.assertTrue(ch.is_closed)
2448
2449
2450        # Verify that both messages are present in the queue
2451        ch = connection.channel()
2452        _, _, rx_body = ch.basic_get(q_name)
2453        self.assertEqual(rx_body, as_bytes('via-publish1'))
2454        _, _, rx_body = ch.basic_get(q_name)
2455        self.assertEqual(rx_body, as_bytes('via-publish2'))
2456
2457
2458class TestCloseConnectionFromBasicConsumeCallback(BlockingTestCaseBase):
2459
2460    def test(self):
2461        """BlockingConnection.close from basic_consume callback
2462        """
2463        connection = self._connect()
2464
2465        ch = connection.channel()
2466
2467        q_name = (
2468            'TestCloseConnectionFromBasicConsumeCallback_q' + uuid.uuid1().hex)
2469
2470        # Place channel in publisher-acknowledgments mode so that publishing
2471        # with mandatory=True will be synchronous
2472        ch.confirm_delivery()
2473
2474        # Declare the queue
2475        ch.queue_declare(q_name, auto_delete=False)
2476        self.addCleanup(lambda: self._connect().channel().queue_delete(q_name))
2477
2478        # Deposit two messages in the queue
2479        ch.basic_publish('',
2480                         routing_key=q_name,
2481                         body='via-publish1',
2482                         mandatory=True)
2483
2484        ch.basic_publish('',
2485                         routing_key=q_name,
2486                         body='via-publish2',
2487                         mandatory=True)
2488
2489        # Create a consumer
2490        def on_consume(channel, method, props, body):  # pylint: disable=W0613
2491            connection.close()
2492
2493        ch.basic_consume(q_name,
2494                         on_consume,
2495                         auto_ack=False,
2496                         exclusive=False,
2497                         arguments=None)
2498
2499        ch.start_consuming()
2500
2501        self.assertTrue(ch.is_closed)
2502        self.assertTrue(connection.is_closed)
2503
2504
2505        # Verify that both messages are present in the queue
2506        ch = self._connect().channel()
2507        _, _, rx_body = ch.basic_get(q_name)
2508        self.assertEqual(rx_body, as_bytes('via-publish1'))
2509        _, _, rx_body = ch.basic_get(q_name)
2510        self.assertEqual(rx_body, as_bytes('via-publish2'))
2511
2512
2513class TestStartConsumingRaisesChannelClosedOnSameChannelFailure(BlockingTestCaseBase):
2514
2515    def test(self):
2516        """start_consuming() exits with ChannelClosed exception on same channel failure
2517        """
2518        connection = self._connect()
2519
2520        # Fail test if exception leaks back ito I/O loop
2521        self._instrument_io_loop_exception_leak_detection(connection)
2522
2523        ch = connection.channel()
2524
2525        q_name = (
2526            'TestStartConsumingPassesChannelClosedOnSameChannelFailure_q' +
2527            uuid.uuid1().hex)
2528
2529        # Declare the queue
2530        ch.queue_declare(q_name, auto_delete=False)
2531        self.addCleanup(lambda: self._connect().channel().queue_delete(q_name))
2532
2533        ch.basic_consume(q_name,
2534                         lambda *args, **kwargs: None,
2535                         auto_ack=False,
2536                         exclusive=False,
2537                         arguments=None)
2538
2539        # Schedule a callback that will cause a channel error on the consumer's
2540        # channel by publishing to an unknown exchange. This will cause the
2541        # broker to close our channel.
2542        connection.add_callback_threadsafe(
2543            lambda: ch.basic_publish(
2544                exchange=q_name,
2545                routing_key='123',
2546                body=b'Nope this is wrong'))
2547
2548        with self.assertRaises(pika.exceptions.ChannelClosedByBroker):
2549            ch.start_consuming()
2550
2551
2552class TestStartConsumingReturnsAfterCancelFromBroker(BlockingTestCaseBase):
2553
2554    def test(self):
2555        """start_consuming() returns after Cancel from broker
2556        """
2557        connection = self._connect()
2558
2559        ch = connection.channel()
2560
2561        q_name = (
2562            'TestStartConsumingExitsOnCancelFromBroker_q' + uuid.uuid1().hex)
2563
2564        # Declare the queue
2565        ch.queue_declare(q_name, auto_delete=False)
2566        self.addCleanup(lambda: self._connect().channel().queue_delete(q_name))
2567
2568        consumer_tag = ch.basic_consume(q_name,
2569                                        lambda *args, **kwargs: None,
2570                                        auto_ack=False,
2571                                        exclusive=False,
2572                                        arguments=None)
2573
2574        # Schedule a callback that will run while start_consuming() is
2575        # executing and delete the queue. This will cause the broker to cancel
2576        # our consumer
2577        connection.add_callback_threadsafe(
2578            lambda: self._connect().channel().queue_delete(q_name))
2579
2580        ch.start_consuming()
2581
2582        self.assertNotIn(consumer_tag, ch._consumer_infos)
2583
2584
2585class TestNonPubAckPublishAndConsumeHugeMessage(BlockingTestCaseBase):
2586
2587    def test(self):
2588        """BlockingChannel.publish/consume huge message"""
2589        connection = self._connect()
2590
2591        ch = connection.channel()
2592
2593        q_name = 'TestPublishAndConsumeHugeMessage_q' + uuid.uuid1().hex
2594        body = 'a' * 1000000
2595
2596        # Declare a new queue
2597        ch.queue_declare(q_name, auto_delete=False)
2598        self.addCleanup(lambda: self._connect().channel().queue_delete(q_name))
2599
2600        # Publish a message to the queue by way of default exchange
2601        ch.basic_publish(exchange='', routing_key=q_name, body=body)
2602        LOGGER.info('Published message body size=%s', len(body))
2603
2604        # Consume the message
2605        for rx_method, rx_props, rx_body in ch.consume(q_name, auto_ack=False,
2606                                                       exclusive=False,
2607                                                       arguments=None):
2608            self.assertIsInstance(rx_method, pika.spec.Basic.Deliver)
2609            self.assertEqual(rx_method.delivery_tag, 1)
2610            self.assertFalse(rx_method.redelivered)
2611            self.assertEqual(rx_method.exchange, '')
2612            self.assertEqual(rx_method.routing_key, q_name)
2613
2614            self.assertIsInstance(rx_props, pika.BasicProperties)
2615            self.assertEqual(rx_body, as_bytes(body))
2616
2617            # Ack the message
2618            ch.basic_ack(delivery_tag=rx_method.delivery_tag, multiple=False)
2619
2620            break
2621
2622        # There shouldn't be any more events now
2623        self.assertFalse(ch._queue_consumer_generator.pending_events)
2624
2625        # Verify that the queue is now empty
2626        ch.close()
2627        ch = connection.channel()
2628        self._assert_exact_message_count_with_retries(channel=ch,
2629                                                      queue=q_name,
2630                                                      expected_count=0)
2631
2632
2633class TestNonPubAckPublishAndConsumeManyMessages(BlockingTestCaseBase):
2634
2635    def test(self):
2636        """BlockingChannel non-pub-ack publish/consume many messages"""
2637        connection = self._connect()
2638
2639        ch = connection.channel()
2640
2641        q_name = ('TestNonPubackPublishAndConsumeManyMessages_q' +
2642                  uuid.uuid1().hex)
2643        body = 'b' * 1024
2644
2645        num_messages_to_publish = 500
2646
2647        # Declare a new queue
2648        ch.queue_declare(q_name, auto_delete=False)
2649        self.addCleanup(lambda: self._connect().channel().queue_delete(q_name))
2650
2651        for _ in pika.compat.xrange(num_messages_to_publish):
2652            # Publish a message to the queue by way of default exchange
2653            ch.basic_publish(exchange='', routing_key=q_name, body=body)
2654
2655        # Consume the messages
2656        num_consumed = 0
2657        for rx_method, rx_props, rx_body in ch.consume(q_name,
2658                                                       auto_ack=False,
2659                                                       exclusive=False,
2660                                                       arguments=None):
2661            num_consumed += 1
2662            self.assertIsInstance(rx_method, pika.spec.Basic.Deliver)
2663            self.assertEqual(rx_method.delivery_tag, num_consumed)
2664            self.assertFalse(rx_method.redelivered)
2665            self.assertEqual(rx_method.exchange, '')
2666            self.assertEqual(rx_method.routing_key, q_name)
2667
2668            self.assertIsInstance(rx_props, pika.BasicProperties)
2669            self.assertEqual(rx_body, as_bytes(body))
2670
2671            # Ack the message
2672            ch.basic_ack(delivery_tag=rx_method.delivery_tag, multiple=False)
2673
2674            if num_consumed >= num_messages_to_publish:
2675                break
2676
2677        # There shouldn't be any more events now
2678        self.assertFalse(ch._queue_consumer_generator.pending_events)
2679
2680        ch.close()
2681
2682        self.assertIsNone(ch._queue_consumer_generator)
2683
2684        # Verify that the queue is now empty
2685        ch = connection.channel()
2686        self._assert_exact_message_count_with_retries(channel=ch,
2687                                                      queue=q_name,
2688                                                      expected_count=0)
2689
2690
2691class TestBasicCancelWithNonAckableConsumer(BlockingTestCaseBase):
2692
2693    def test(self):
2694        """BlockingChannel user cancels non-ackable consumer via basic_cancel"""
2695        connection = self._connect()
2696
2697        ch = connection.channel()
2698
2699        q_name = (
2700            'TestBasicCancelWithNonAckableConsumer_q' + uuid.uuid1().hex)
2701
2702        body1 = 'a' * 1024
2703        body2 = 'b' * 2048
2704
2705        # Declare a new queue
2706        ch.queue_declare(q_name, auto_delete=False)
2707        self.addCleanup(lambda: self._connect().channel().queue_delete(q_name))
2708
2709        # Publish two messages to the queue by way of default exchange
2710        ch.basic_publish(exchange='', routing_key=q_name, body=body1)
2711        ch.basic_publish(exchange='', routing_key=q_name, body=body2)
2712
2713        # Wait for queue to contain both messages
2714        self._assert_exact_message_count_with_retries(channel=ch,
2715                                                      queue=q_name,
2716                                                      expected_count=2)
2717
2718        # Create a consumer that uses automatic ack mode
2719        consumer_tag = ch.basic_consume(q_name, lambda *x: None, auto_ack=True,
2720                                        exclusive=False, arguments=None)
2721
2722        # Wait for all messages to be sent by broker to client
2723        self._assert_exact_message_count_with_retries(channel=ch,
2724                                                      queue=q_name,
2725                                                      expected_count=0)
2726
2727        # Cancel the consumer
2728        messages = ch.basic_cancel(consumer_tag)
2729
2730        # Both messages should have been on their way when we cancelled
2731        self.assertEqual(len(messages), 2)
2732
2733        _, _, rx_body1 = messages[0]
2734        self.assertEqual(rx_body1, as_bytes(body1))
2735
2736        _, _, rx_body2 = messages[1]
2737        self.assertEqual(rx_body2, as_bytes(body2))
2738
2739        ch.close()
2740
2741        ch = connection.channel()
2742
2743        # Verify that the queue is now empty
2744        frame = ch.queue_declare(q_name, passive=True)
2745        self.assertEqual(frame.method.message_count, 0)
2746
2747
2748class TestBasicCancelWithAckableConsumer(BlockingTestCaseBase):
2749
2750    def test(self):
2751        """BlockingChannel user cancels ackable consumer via basic_cancel"""
2752        connection = self._connect()
2753
2754        ch = connection.channel()
2755
2756        q_name = (
2757            'TestBasicCancelWithAckableConsumer_q' + uuid.uuid1().hex)
2758
2759        body1 = 'a' * 1024
2760        body2 = 'b' * 2048
2761
2762        # Declare a new queue
2763        ch.queue_declare(q_name, auto_delete=False)
2764        self.addCleanup(lambda: self._connect().channel().queue_delete(q_name))
2765
2766        # Publish two messages to the queue by way of default exchange
2767        ch.basic_publish(exchange='', routing_key=q_name, body=body1)
2768        ch.basic_publish(exchange='', routing_key=q_name, body=body2)
2769
2770        # Wait for queue to contain both messages
2771        self._assert_exact_message_count_with_retries(channel=ch,
2772                                                      queue=q_name,
2773                                                      expected_count=2)
2774
2775        # Create an ackable consumer
2776        consumer_tag = ch.basic_consume(q_name, lambda *x: None, auto_ack=False,
2777                                        exclusive=False, arguments=None)
2778
2779        # Wait for all messages to be sent by broker to client
2780        self._assert_exact_message_count_with_retries(channel=ch,
2781                                                      queue=q_name,
2782                                                      expected_count=0)
2783
2784        # Cancel the consumer
2785        messages = ch.basic_cancel(consumer_tag)
2786
2787        # Both messages should have been on their way when we cancelled
2788        self.assertEqual(len(messages), 0)
2789
2790        ch.close()
2791
2792        ch = connection.channel()
2793
2794        # Verify that canceling the ackable consumer restored both messages
2795        self._assert_exact_message_count_with_retries(channel=ch,
2796                                                      queue=q_name,
2797                                                      expected_count=2)
2798
2799
2800class TestUnackedMessageAutoRestoredToQueueOnChannelClose(BlockingTestCaseBase):
2801
2802    def test(self):
2803        """BlockingChannel unacked message restored to q on channel close """
2804        connection = self._connect()
2805
2806        ch = connection.channel()
2807
2808        q_name = ('TestUnackedMessageAutoRestoredToQueueOnChannelClose_q' +
2809                  uuid.uuid1().hex)
2810
2811        body1 = 'a' * 1024
2812        body2 = 'b' * 2048
2813
2814        # Declare a new queue
2815        ch.queue_declare(q_name, auto_delete=False)
2816        self.addCleanup(lambda: self._connect().channel().queue_delete(q_name))
2817
2818        # Publish two messages to the queue by way of default exchange
2819        ch.basic_publish(exchange='', routing_key=q_name, body=body1)
2820        ch.basic_publish(exchange='', routing_key=q_name, body=body2)
2821
2822        # Consume the events, but don't ack
2823        rx_messages = []
2824        ch.basic_consume(q_name, lambda *args: rx_messages.append(args),
2825                         auto_ack=False, exclusive=False, arguments=None)
2826        while len(rx_messages) != 2:
2827            connection.process_data_events(time_limit=None)
2828
2829        self.assertEqual(rx_messages[0][1].delivery_tag, 1)
2830        self.assertEqual(rx_messages[1][1].delivery_tag, 2)
2831
2832        # Verify no more ready messages in queue
2833        frame = ch.queue_declare(q_name, passive=True)
2834        self.assertEqual(frame.method.message_count, 0)
2835
2836        # Closing channel should restore messages back to queue
2837        ch.close()
2838
2839        # Verify that there are two messages in q now
2840        ch = connection.channel()
2841
2842        self._assert_exact_message_count_with_retries(channel=ch,
2843                                                      queue=q_name,
2844                                                      expected_count=2)
2845
2846
2847class TestNoAckMessageNotRestoredToQueueOnChannelClose(BlockingTestCaseBase):
2848
2849    def test(self):
2850        """BlockingChannel unacked message restored to q on channel close """
2851        connection = self._connect()
2852
2853        ch = connection.channel()
2854
2855        q_name = ('TestNoAckMessageNotRestoredToQueueOnChannelClose_q' +
2856                  uuid.uuid1().hex)
2857
2858        body1 = 'a' * 1024
2859        body2 = 'b' * 2048
2860
2861        # Declare a new queue
2862        ch.queue_declare(q_name, auto_delete=False)
2863        self.addCleanup(lambda: self._connect().channel().queue_delete(q_name))
2864
2865        # Publish two messages to the queue by way of default exchange
2866        ch.basic_publish(exchange='', routing_key=q_name, body=body1)
2867        ch.basic_publish(exchange='', routing_key=q_name, body=body2)
2868
2869        # Consume, but don't ack
2870        num_messages = 0
2871        for rx_method, _, _ in ch.consume(q_name, auto_ack=True, exclusive=False):
2872            num_messages += 1
2873
2874            self.assertEqual(rx_method.delivery_tag, num_messages)
2875
2876            if num_messages == 2:
2877                break
2878        else:
2879            self.fail('expected 2 messages, but consumed %i' % (num_messages,))
2880
2881        # Verify no more ready messages in queue
2882        frame = ch.queue_declare(q_name, passive=True)
2883        self.assertEqual(frame.method.message_count, 0)
2884
2885        # Closing channel should not restore no-ack messages back to queue
2886        ch.close()
2887
2888        # Verify that there are no messages in q now
2889        ch = connection.channel()
2890        frame = ch.queue_declare(q_name, passive=True)
2891        self.assertEqual(frame.method.message_count, 0)
2892
2893
2894class TestConsumeGeneratorInactivityTimeout(BlockingTestCaseBase):
2895
2896    def test(self):
2897        """BlockingChannel consume returns 3-tuple of None values on inactivity timeout """
2898        connection = self._connect()
2899
2900        ch = connection.channel()
2901
2902        q_name = ('TestConsumeGeneratorInactivityTimeout_q' + uuid.uuid1().hex)
2903
2904        # Declare a new queue
2905        ch.queue_declare(q_name, auto_delete=True)
2906
2907        # Expect to get only (None, None, None) upon inactivity timeout, since
2908        # there are no messages in queue
2909        for msg in ch.consume(q_name, inactivity_timeout=0.1):
2910            self.assertEqual(msg, (None, None, None))
2911            break
2912        else:
2913            self.fail('expected (None, None, None), but iterator stopped')
2914
2915
2916class TestConsumeGeneratorInterruptedByCancelFromBroker(BlockingTestCaseBase):
2917
2918    def test(self):
2919        """BlockingChannel consume generator is interrupted broker's Cancel """
2920        connection = self._connect()
2921
2922        self.assertTrue(connection.consumer_cancel_notify_supported)
2923
2924        ch = connection.channel()
2925
2926        q_name = ('TestConsumeGeneratorInterruptedByCancelFromBroker_q' +
2927                  uuid.uuid1().hex)
2928
2929        # Declare a new queue
2930        ch.queue_declare(q_name, auto_delete=True)
2931
2932        queue_deleted = False
2933        for _ in ch.consume(q_name, auto_ack=False, inactivity_timeout=0.001):
2934            if not queue_deleted:
2935                # Delete the queue to force Basic.Cancel from the broker
2936                ch.queue_delete(q_name)
2937                queue_deleted = True
2938
2939        self.assertIsNone(ch._queue_consumer_generator)
2940
2941
2942class TestConsumeGeneratorCancelEncountersCancelFromBroker(BlockingTestCaseBase):
2943
2944    def test(self):
2945        """BlockingChannel consume generator cancel called when broker's Cancel is enqueued """
2946        connection = self._connect()
2947
2948        self.assertTrue(connection.consumer_cancel_notify_supported)
2949
2950        ch = connection.channel()
2951
2952        q_name = ('TestConsumeGeneratorCancelEncountersCancelFromBroker_q' +
2953                  uuid.uuid1().hex)
2954
2955        # Declare a new queue
2956        ch.queue_declare(q_name, auto_delete=True)
2957
2958        for _ in ch.consume(q_name, auto_ack=False, inactivity_timeout=0.001):
2959            # Delete the queue to force Basic.Cancel from the broker
2960            ch.queue_delete(q_name)
2961
2962            # Wait for server's Basic.Cancel
2963            while not ch._queue_consumer_generator.pending_events:
2964                connection.process_data_events()
2965
2966            # Confirm it's Basic.Cancel
2967            self.assertIsInstance(ch._queue_consumer_generator.pending_events[0],
2968                                  blocking_connection._ConsumerCancellationEvt)
2969
2970            # Now attempt to cancel the consumer generator
2971            ch.cancel()
2972
2973            self.assertIsNone(ch._queue_consumer_generator)
2974
2975
2976class TestConsumeGeneratorPassesChannelClosedOnSameChannelFailure(BlockingTestCaseBase):
2977
2978    def test(self):
2979        """consume() exits with ChannelClosed exception on same channel failure
2980        """
2981        connection = self._connect()
2982
2983        # Fail test if exception leaks back ito I/O loop
2984        self._instrument_io_loop_exception_leak_detection(connection)
2985
2986        ch = connection.channel()
2987
2988        q_name = (
2989            'TestConsumeGeneratorPassesChannelClosedOnSameChannelFailure_q' +
2990            uuid.uuid1().hex)
2991
2992        # Declare the queue
2993        ch.queue_declare(q_name, auto_delete=False)
2994        self.addCleanup(lambda: self._connect().channel().queue_delete(q_name))
2995
2996        # Schedule a callback that will cause a channel error on the consumer's
2997        # channel by publishing to an unknown exchange. This will cause the
2998        # broker to close our channel.
2999        connection.add_callback_threadsafe(
3000            lambda: ch.basic_publish(
3001                exchange=q_name,
3002                routing_key='123',
3003                body=b'Nope this is wrong'))
3004
3005        with self.assertRaises(pika.exceptions.ChannelClosedByBroker):
3006            for _ in ch.consume(q_name):
3007                pass
3008
3009
3010class TestChannelFlow(BlockingTestCaseBase):
3011
3012    def test(self):
3013        """BlockingChannel Channel.Flow activate and deactivate """
3014        connection = self._connect()
3015
3016        ch = connection.channel()
3017
3018        q_name = ('TestChannelFlow_q' + uuid.uuid1().hex)
3019
3020        # Declare a new queue
3021        ch.queue_declare(q_name, auto_delete=False)
3022        self.addCleanup(lambda: self._connect().channel().queue_delete(q_name))
3023
3024        # Verify zero active consumers on the queue
3025        frame = ch.queue_declare(q_name, passive=True)
3026        self.assertEqual(frame.method.consumer_count, 0)
3027
3028        # Create consumer
3029        ch.basic_consume(q_name, lambda *args: None)
3030
3031        # Verify one active consumer on the queue now
3032        frame = ch.queue_declare(q_name, passive=True)
3033        self.assertEqual(frame.method.consumer_count, 1)
3034
3035        # Activate flow from default state (active by default)
3036        active = ch.flow(True)
3037        self.assertEqual(active, True)
3038
3039        # Verify still one active consumer on the queue now
3040        frame = ch.queue_declare(q_name, passive=True)
3041        self.assertEqual(frame.method.consumer_count, 1)
3042
3043
3044class TestChannelRaisesWrongStateWhenDeclaringQueueOnClosedChannel(BlockingTestCaseBase):
3045    def test(self):
3046        """BlockingConnection: Declaring queue on closed channel raises ChannelWrongStateError"""
3047        q_name = (
3048            'TestChannelRaisesWrongStateWhenDeclaringQueueOnClosedChannel_q' +
3049            uuid.uuid1().hex)
3050
3051        channel = self._connect().channel()
3052        channel.close()
3053        with self.assertRaises(pika.exceptions.ChannelWrongStateError):
3054            channel.queue_declare(q_name)
3055
3056
3057class TestChannelRaisesWrongStateWhenClosingClosedChannel(BlockingTestCaseBase):
3058    def test(self):
3059        """BlockingConnection: Closing closed channel raises ChannelWrongStateError"""
3060        channel = self._connect().channel()
3061        channel.close()
3062        with self.assertRaises(pika.exceptions.ChannelWrongStateError):
3063            channel.close()
3064
3065
3066class TestChannelContextManagerClosesChannel(BlockingTestCaseBase):
3067    def test(self):
3068        """BlockingConnection: chanel context manager exit survives closed channel"""
3069        with self._connect().channel() as channel:
3070            self.assertTrue(channel.is_open)
3071
3072        self.assertTrue(channel.is_closed)
3073
3074
3075class TestChannelContextManagerExitSurvivesClosedChannel(BlockingTestCaseBase):
3076    def test(self):
3077        """BlockingConnection: chanel context manager exit survives closed channel"""
3078        with self._connect().channel() as channel:
3079            self.assertTrue(channel.is_open)
3080            channel.close()
3081            self.assertTrue(channel.is_closed)
3082
3083        self.assertTrue(channel.is_closed)
3084
3085
3086class TestChannelContextManagerDoesNotSuppressChannelClosedByBroker(
3087        BlockingTestCaseBase):
3088    def test(self):
3089        """BlockingConnection: chanel context manager doesn't suppress ChannelClosedByBroker exception"""
3090
3091        exg_name = (
3092            "TestChannelContextManagerDoesNotSuppressChannelClosedByBroker" +
3093            uuid.uuid1().hex)
3094
3095        with self.assertRaises(pika.exceptions.ChannelClosedByBroker):
3096            with self._connect().channel() as channel:
3097                # Passively declaring non-existent exchange should force broker
3098                # to close channel
3099                channel.exchange_declare(exg_name, passive=True)
3100
3101        self.assertTrue(channel.is_closed)
3102
3103
3104if __name__ == '__main__':
3105    unittest.main()
3106