1from __future__ import absolute_import, unicode_literals
2
3import io
4import pytest
5import warnings
6import socket
7
8from case import MagicMock, Mock, patch
9
10from kombu import Connection
11from kombu.compression import compress
12from kombu.exceptions import ResourceError, ChannelError
13from kombu.transport import virtual
14from kombu.utils.uuid import uuid
15from kombu.five import PY3, monotonic
16
17PRINT_FQDN = 'builtins.print' if PY3 else '__builtin__.print'
18
19
20def client(**kwargs):
21    return Connection(transport='kombu.transport.virtual:Transport', **kwargs)
22
23
24def memory_client():
25    return Connection(transport='memory')
26
27
28def test_BrokerState():
29    s = virtual.BrokerState()
30    assert hasattr(s, 'exchanges')
31
32    t = virtual.BrokerState(exchanges=16)
33    assert t.exchanges == 16
34
35
36class test_QoS:
37
38    def setup(self):
39        self.q = virtual.QoS(client().channel(), prefetch_count=10)
40
41    def teardown(self):
42        self.q._on_collect.cancel()
43
44    def test_constructor(self):
45        assert self.q.channel
46        assert self.q.prefetch_count
47        assert not self.q._delivered.restored
48        assert self.q._on_collect
49
50    def test_restore_visible__interface(self):
51        qos = virtual.QoS(client().channel())
52        qos.restore_visible()
53
54    def test_can_consume(self, stdouts):
55        stderr = io.StringIO()
56        _restored = []
57
58        class RestoreChannel(virtual.Channel):
59            do_restore = True
60
61            def _restore(self, message):
62                _restored.append(message)
63
64        assert self.q.can_consume()
65        for i in range(self.q.prefetch_count - 1):
66            self.q.append(i, uuid())
67            assert self.q.can_consume()
68        self.q.append(i + 1, uuid())
69        assert not self.q.can_consume()
70
71        tag1 = next(iter(self.q._delivered))
72        self.q.ack(tag1)
73        assert self.q.can_consume()
74
75        tag2 = uuid()
76        self.q.append(i + 2, tag2)
77        assert not self.q.can_consume()
78        self.q.reject(tag2)
79        assert self.q.can_consume()
80
81        self.q.channel = RestoreChannel(self.q.channel.connection)
82        tag3 = uuid()
83        self.q.append(i + 3, tag3)
84        self.q.reject(tag3, requeue=True)
85        self.q._flush()
86        assert self.q._delivered
87        assert not self.q._delivered.restored
88        self.q.restore_unacked_once(stderr=stderr)
89        assert _restored == [11, 9, 8, 7, 6, 5, 4, 3, 2, 1]
90        assert self.q._delivered.restored
91        assert not self.q._delivered
92
93        self.q.restore_unacked_once(stderr=stderr)
94        self.q._delivered.restored = False
95        self.q.restore_unacked_once(stderr=stderr)
96
97        assert stderr.getvalue()
98        assert not stdouts.stdout.getvalue()
99
100        self.q.restore_at_shutdown = False
101        self.q.restore_unacked_once()
102
103    def test_get(self):
104        self.q._delivered['foo'] = 1
105        assert self.q.get('foo') == 1
106
107
108class test_Message:
109
110    def test_create(self):
111        c = client().channel()
112        data = c.prepare_message('the quick brown fox...')
113        tag = data['properties']['delivery_tag'] = uuid()
114        message = c.message_to_python(data)
115        assert isinstance(message, virtual.Message)
116        assert message is c.message_to_python(message)
117        if message.errors:
118            message._reraise_error()
119
120        assert message.body == 'the quick brown fox...'.encode('utf-8')
121        assert message.delivery_tag, tag
122
123    def test_create_no_body(self):
124        virtual.Message(channel=Mock(), payload={
125            'body': None,
126            'properties': {'delivery_tag': 1},
127        })
128
129    def test_serializable(self):
130        c = client().channel()
131        body, content_type = compress('the quick brown fox...', 'gzip')
132        data = c.prepare_message(body, headers={'compression': content_type})
133        tag = data['properties']['delivery_tag'] = uuid()
134        message = c.message_to_python(data)
135        dict_ = message.serializable()
136        assert dict_['body'] == 'the quick brown fox...'.encode('utf-8')
137        assert dict_['properties']['delivery_tag'] == tag
138        assert 'compression' not in dict_['headers']
139
140
141class test_AbstractChannel:
142
143    def test_get(self):
144        with pytest.raises(NotImplementedError):
145            virtual.AbstractChannel()._get('queue')
146
147    def test_put(self):
148        with pytest.raises(NotImplementedError):
149            virtual.AbstractChannel()._put('queue', 'm')
150
151    def test_size(self):
152        assert virtual.AbstractChannel()._size('queue') == 0
153
154    def test_purge(self):
155        with pytest.raises(NotImplementedError):
156            virtual.AbstractChannel()._purge('queue')
157
158    def test_delete(self):
159        with pytest.raises(NotImplementedError):
160            virtual.AbstractChannel()._delete('queue')
161
162    def test_new_queue(self):
163        assert virtual.AbstractChannel()._new_queue('queue') is None
164
165    def test_has_queue(self):
166        assert virtual.AbstractChannel()._has_queue('queue')
167
168    def test_poll(self):
169        cycle = Mock(name='cycle')
170        assert virtual.AbstractChannel()._poll(cycle, Mock())
171        cycle.get.assert_called()
172
173
174class test_Channel:
175
176    def setup(self):
177        self.channel = client().channel()
178
179    def teardown(self):
180        if self.channel._qos is not None:
181            self.channel._qos._on_collect.cancel()
182
183    def test_exceeds_channel_max(self):
184        c = client()
185        t = c.transport
186        avail = t._avail_channel_ids = Mock(name='_avail_channel_ids')
187        avail.pop.side_effect = IndexError()
188        with pytest.raises(ResourceError):
189            virtual.Channel(t)
190
191    def test_exchange_bind_interface(self):
192        with pytest.raises(NotImplementedError):
193            self.channel.exchange_bind('dest', 'src', 'key')
194
195    def test_exchange_unbind_interface(self):
196        with pytest.raises(NotImplementedError):
197            self.channel.exchange_unbind('dest', 'src', 'key')
198
199    def test_queue_unbind_interface(self):
200        self.channel.queue_unbind('dest', 'ex', 'key')
201
202    def test_management(self):
203        m = self.channel.connection.client.get_manager()
204        assert m
205        m.get_bindings()
206        m.close()
207
208    def test_exchange_declare(self):
209        c = self.channel
210
211        with pytest.raises(ChannelError):
212            c.exchange_declare('test_exchange_declare', 'direct',
213                               durable=True, auto_delete=True, passive=True)
214        c.exchange_declare('test_exchange_declare', 'direct',
215                           durable=True, auto_delete=True)
216        c.exchange_declare('test_exchange_declare', 'direct',
217                           durable=True, auto_delete=True, passive=True)
218        assert 'test_exchange_declare' in c.state.exchanges
219        # can declare again with same values
220        c.exchange_declare('test_exchange_declare', 'direct',
221                           durable=True, auto_delete=True)
222        assert 'test_exchange_declare' in c.state.exchanges
223
224        # using different values raises NotEquivalentError
225        with pytest.raises(virtual.NotEquivalentError):
226            c.exchange_declare('test_exchange_declare', 'direct',
227                               durable=False, auto_delete=True)
228
229    def test_exchange_delete(self, ex='test_exchange_delete'):
230
231        class PurgeChannel(virtual.Channel):
232            purged = []
233
234            def _purge(self, queue):
235                self.purged.append(queue)
236
237        c = PurgeChannel(self.channel.connection)
238
239        c.exchange_declare(ex, 'direct', durable=True, auto_delete=True)
240        assert ex in c.state.exchanges
241        assert not c.state.has_binding(ex, ex, ex)  # no bindings yet
242        c.exchange_delete(ex)
243        assert ex not in c.state.exchanges
244
245        c.exchange_declare(ex, 'direct', durable=True, auto_delete=True)
246        c.queue_declare(ex)
247        c.queue_bind(ex, ex, ex)
248        assert c.state.has_binding(ex, ex, ex)
249        c.exchange_delete(ex)
250        assert not c.state.has_binding(ex, ex, ex)
251        assert ex in c.purged
252
253    def test_queue_delete__if_empty(self, n='test_queue_delete__if_empty'):
254        class PurgeChannel(virtual.Channel):
255            purged = []
256            size = 30
257
258            def _purge(self, queue):
259                self.purged.append(queue)
260
261            def _size(self, queue):
262                return self.size
263
264        c = PurgeChannel(self.channel.connection)
265        c.exchange_declare(n)
266        c.queue_declare(n)
267        c.queue_bind(n, n, n)
268        # tests code path that returns if queue already bound.
269        c.queue_bind(n, n, n)
270
271        c.queue_delete(n, if_empty=True)
272        assert c.state.has_binding(n, n, n)
273
274        c.size = 0
275        c.queue_delete(n, if_empty=True)
276        assert not c.state.has_binding(n, n, n)
277        assert n in c.purged
278
279    def test_queue_purge(self, n='test_queue_purge'):
280
281        class PurgeChannel(virtual.Channel):
282            purged = []
283
284            def _purge(self, queue):
285                self.purged.append(queue)
286
287        c = PurgeChannel(self.channel.connection)
288        c.exchange_declare(n)
289        c.queue_declare(n)
290        c.queue_bind(n, n, n)
291        c.queue_purge(n)
292        assert n in c.purged
293
294    def test_basic_publish__anon_exchange(self):
295        c = memory_client().channel()
296        msg = MagicMock(name='msg')
297        c.encode_body = Mock(name='c.encode_body')
298        c.encode_body.return_value = (1, 2)
299        c._put = Mock(name='c._put')
300        c.basic_publish(msg, None, 'rkey', kw=1)
301        c._put.assert_called_with('rkey', msg, kw=1)
302
303    def test_basic_publish_unique_delivery_tags(self, n='test_uniq_tag'):
304        c1 = memory_client().channel()
305        c2 = memory_client().channel()
306
307        for c in (c1, c2):
308            c.exchange_declare(n)
309            c.queue_declare(n)
310            c.queue_bind(n, n, n)
311        m1 = c1.prepare_message('George Costanza')
312        m2 = c2.prepare_message('Elaine Marie Benes')
313        c1.basic_publish(m1, n, n)
314        c2.basic_publish(m2, n, n)
315
316        r1 = c1.message_to_python(c1.basic_get(n))
317        r2 = c2.message_to_python(c2.basic_get(n))
318
319        assert r1.delivery_tag != r2.delivery_tag
320        with pytest.raises(ValueError):
321            int(r1.delivery_tag)
322        with pytest.raises(ValueError):
323            int(r2.delivery_tag)
324
325    def test_basic_publish__get__consume__restore(self,
326                                                  n='test_basic_publish'):
327        c = memory_client().channel()
328
329        c.exchange_declare(n)
330        c.queue_declare(n)
331        c.queue_bind(n, n, n)
332        c.queue_declare(n + '2')
333        c.queue_bind(n + '2', n, n)
334        messages = []
335        c.connection._deliver = Mock(name='_deliver')
336
337        def on_deliver(message, queue):
338            messages.append(message)
339        c.connection._deliver.side_effect = on_deliver
340
341        m = c.prepare_message('nthex quick brown fox...')
342        c.basic_publish(m, n, n)
343
344        r1 = c.message_to_python(c.basic_get(n))
345        assert r1
346        assert r1.body == 'nthex quick brown fox...'.encode('utf-8')
347        assert c.basic_get(n) is None
348
349        consumer_tag = uuid()
350
351        c.basic_consume(n + '2', False,
352                        consumer_tag=consumer_tag, callback=lambda *a: None)
353        assert n + '2' in c._active_queues
354        c.drain_events()
355        r2 = c.message_to_python(messages[-1])
356        assert r2.body == 'nthex quick brown fox...'.encode('utf-8')
357        assert r2.delivery_info['exchange'] == n
358        assert r2.delivery_info['routing_key'] == n
359        with pytest.raises(virtual.Empty):
360            c.drain_events()
361        c.basic_cancel(consumer_tag)
362
363        c._restore(r2)
364        r3 = c.message_to_python(c.basic_get(n))
365        assert r3
366        assert r3.body == 'nthex quick brown fox...'.encode('utf-8')
367        assert c.basic_get(n) is None
368
369    def test_basic_ack(self):
370
371        class MockQoS(virtual.QoS):
372            was_acked = False
373
374            def ack(self, delivery_tag):
375                self.was_acked = True
376
377        self.channel._qos = MockQoS(self.channel)
378        self.channel.basic_ack('foo')
379        assert self.channel._qos.was_acked
380
381    def test_basic_recover__requeue(self):
382
383        class MockQoS(virtual.QoS):
384            was_restored = False
385
386            def restore_unacked(self):
387                self.was_restored = True
388
389        self.channel._qos = MockQoS(self.channel)
390        self.channel.basic_recover(requeue=True)
391        assert self.channel._qos.was_restored
392
393    def test_restore_unacked_raises_BaseException(self):
394        q = self.channel.qos
395        q._flush = Mock()
396        q._delivered = {1: 1}
397
398        q.channel._restore = Mock()
399        q.channel._restore.side_effect = SystemExit
400
401        errors = q.restore_unacked()
402        assert isinstance(errors[0][0], SystemExit)
403        assert errors[0][1] == 1
404        assert not q._delivered
405
406    @patch('kombu.transport.virtual.base.emergency_dump_state')
407    @patch(PRINT_FQDN)
408    def test_restore_unacked_once_when_unrestored(self, print_,
409                                                  emergency_dump_state):
410        q = self.channel.qos
411        q._flush = Mock()
412
413        class State(dict):
414            restored = False
415
416        q._delivered = State({1: 1})
417        ru = q.restore_unacked = Mock()
418        exc = None
419        try:
420            raise KeyError()
421        except KeyError as exc_:
422            exc = exc_
423        ru.return_value = [(exc, 1)]
424
425        self.channel.do_restore = True
426        q.restore_unacked_once()
427        print_.assert_called()
428        emergency_dump_state.assert_called()
429
430    def test_basic_recover(self):
431        with pytest.raises(NotImplementedError):
432            self.channel.basic_recover(requeue=False)
433
434    def test_basic_reject(self):
435
436        class MockQoS(virtual.QoS):
437            was_rejected = False
438
439            def reject(self, delivery_tag, requeue=False):
440                self.was_rejected = True
441
442        self.channel._qos = MockQoS(self.channel)
443        self.channel.basic_reject('foo')
444        assert self.channel._qos.was_rejected
445
446    def test_basic_qos(self):
447        self.channel.basic_qos(prefetch_count=128)
448        assert self.channel._qos.prefetch_count == 128
449
450    def test_lookup__undeliverable(self, n='test_lookup__undeliverable'):
451        warnings.resetwarnings()
452        with warnings.catch_warnings(record=True) as log:
453            assert self.channel._lookup(n, n, 'ae.undeliver') == [
454                'ae.undeliver',
455            ]
456            assert log
457            assert 'could not be delivered' in log[0].message.args[0]
458
459    def test_context(self):
460        x = self.channel.__enter__()
461        assert x is self.channel
462        x.__exit__()
463        assert x.closed
464
465    def test_cycle_property(self):
466        assert self.channel.cycle
467
468    def test_flow(self):
469        with pytest.raises(NotImplementedError):
470            self.channel.flow(False)
471
472    def test_close_when_no_connection(self):
473        self.channel.connection = None
474        self.channel.close()
475        assert self.channel.closed
476
477    def test_drain_events_has_get_many(self):
478        c = self.channel
479        c._get_many = Mock()
480        c._poll = Mock()
481        c._consumers = [1]
482        c._qos = Mock()
483        c._qos.can_consume.return_value = True
484
485        c.drain_events(timeout=10.0)
486        c._get_many.assert_called_with(c._active_queues, timeout=10.0)
487
488    def test_get_exchanges(self):
489        self.channel.exchange_declare(exchange='unique_name')
490        assert self.channel.get_exchanges()
491
492    def test_basic_cancel_not_in_active_queues(self):
493        c = self.channel
494        c._consumers.add('x')
495        c._tag_to_queue['x'] = 'foo'
496        c._active_queues = Mock()
497        c._active_queues.remove.side_effect = ValueError()
498
499        c.basic_cancel('x')
500        c._active_queues.remove.assert_called_with('foo')
501
502    def test_basic_cancel_unknown_ctag(self):
503        assert self.channel.basic_cancel('unknown-tag') is None
504
505    def test_list_bindings(self):
506        c = self.channel
507        c.exchange_declare(exchange='unique_name')
508        c.queue_declare(queue='q')
509        c.queue_bind(queue='q', exchange='unique_name', routing_key='rk')
510
511        assert ('q', 'unique_name', 'rk') in list(c.list_bindings())
512
513    def test_after_reply_message_received(self):
514        c = self.channel
515        c.queue_delete = Mock()
516        c.after_reply_message_received('foo')
517        c.queue_delete.assert_called_with('foo')
518
519    def test_queue_delete_unknown_queue(self):
520        assert self.channel.queue_delete('xiwjqjwel') is None
521
522    def test_queue_declare_passive(self):
523        has_queue = self.channel._has_queue = Mock()
524        has_queue.return_value = False
525        with pytest.raises(ChannelError):
526            self.channel.queue_declare(queue='21wisdjwqe', passive=True)
527
528    def test_get_message_priority(self):
529
530        def _message(priority):
531            return self.channel.prepare_message(
532                'the message with priority', priority=priority,
533            )
534
535        assert self.channel._get_message_priority(_message(5)) == 5
536        assert self.channel._get_message_priority(
537            _message(self.channel.min_priority - 10)
538        ) == self.channel.min_priority
539        assert self.channel._get_message_priority(
540            _message(self.channel.max_priority + 10),
541        ) == self.channel.max_priority
542        assert self.channel._get_message_priority(
543            _message('foobar'),
544        ) == self.channel.default_priority
545        assert self.channel._get_message_priority(
546            _message(2), reverse=True,
547        ) == self.channel.max_priority - 2
548
549
550class test_Transport:
551
552    def setup(self):
553        self.transport = client().transport
554
555    def test_custom_polling_interval(self):
556        x = client(transport_options={'polling_interval': 32.3})
557        assert x.transport.polling_interval == 32.3
558
559    def test_timeout_over_polling_interval(self):
560        x = client(transport_options=dict(polling_interval=60))
561        start = monotonic()
562        with pytest.raises(socket.timeout):
563            x.transport.drain_events(x, timeout=.5)
564            assert monotonic() - start < 60
565
566    def test_close_connection(self):
567        c1 = self.transport.create_channel(self.transport)
568        c2 = self.transport.create_channel(self.transport)
569        assert len(self.transport.channels) == 2
570        self.transport.close_connection(self.transport)
571        assert not self.transport.channels
572        del(c1)  # so pyflakes doesn't complain
573        del(c2)
574
575    def test_drain_channel(self):
576        channel = self.transport.create_channel(self.transport)
577        with pytest.raises(virtual.Empty):
578            self.transport._drain_channel(channel, Mock())
579
580    def test__deliver__no_queue(self):
581        with pytest.raises(KeyError):
582            self.transport._deliver(Mock(name='msg'), queue=None)
583
584    def test__reject_inbound_message(self):
585        channel = Mock(name='channel')
586        self.transport.channels = [None, channel]
587        self.transport._reject_inbound_message({'foo': 'bar'})
588        channel.Message.assert_called_with({'foo': 'bar'}, channel=channel)
589        channel.qos.append.assert_called_with(
590            channel.Message(), channel.Message().delivery_tag,
591        )
592        channel.basic_reject.assert_called_with(
593            channel.Message().delivery_tag, requeue=True,
594        )
595
596    def test_on_message_ready(self):
597        channel = Mock(name='channel')
598        msg = Mock(name='msg')
599        callback = Mock(name='callback')
600        self.transport._callbacks = {'q1': callback}
601        self.transport.on_message_ready(channel, msg, queue='q1')
602        callback.assert_called_with(msg)
603
604    def test_on_message_ready__no_queue(self):
605        with pytest.raises(KeyError):
606            self.transport.on_message_ready(
607                Mock(name='channel'), Mock(name='msg'), queue=None)
608
609    def test_on_message_ready__no_callback(self):
610        self.transport._callbacks = {}
611        with pytest.raises(KeyError):
612            self.transport.on_message_ready(
613                Mock(name='channel'), Mock(name='msg'), queue='q1')
614