1from __future__ import absolute_import, unicode_literals 2 3import io 4import pytest 5import warnings 6import socket 7 8from case import MagicMock, Mock, patch 9 10from kombu import Connection 11from kombu.compression import compress 12from kombu.exceptions import ResourceError, ChannelError 13from kombu.transport import virtual 14from kombu.utils.uuid import uuid 15from kombu.five import PY3, monotonic 16 17PRINT_FQDN = 'builtins.print' if PY3 else '__builtin__.print' 18 19 20def client(**kwargs): 21 return Connection(transport='kombu.transport.virtual:Transport', **kwargs) 22 23 24def memory_client(): 25 return Connection(transport='memory') 26 27 28def test_BrokerState(): 29 s = virtual.BrokerState() 30 assert hasattr(s, 'exchanges') 31 32 t = virtual.BrokerState(exchanges=16) 33 assert t.exchanges == 16 34 35 36class test_QoS: 37 38 def setup(self): 39 self.q = virtual.QoS(client().channel(), prefetch_count=10) 40 41 def teardown(self): 42 self.q._on_collect.cancel() 43 44 def test_constructor(self): 45 assert self.q.channel 46 assert self.q.prefetch_count 47 assert not self.q._delivered.restored 48 assert self.q._on_collect 49 50 def test_restore_visible__interface(self): 51 qos = virtual.QoS(client().channel()) 52 qos.restore_visible() 53 54 def test_can_consume(self, stdouts): 55 stderr = io.StringIO() 56 _restored = [] 57 58 class RestoreChannel(virtual.Channel): 59 do_restore = True 60 61 def _restore(self, message): 62 _restored.append(message) 63 64 assert self.q.can_consume() 65 for i in range(self.q.prefetch_count - 1): 66 self.q.append(i, uuid()) 67 assert self.q.can_consume() 68 self.q.append(i + 1, uuid()) 69 assert not self.q.can_consume() 70 71 tag1 = next(iter(self.q._delivered)) 72 self.q.ack(tag1) 73 assert self.q.can_consume() 74 75 tag2 = uuid() 76 self.q.append(i + 2, tag2) 77 assert not self.q.can_consume() 78 self.q.reject(tag2) 79 assert self.q.can_consume() 80 81 self.q.channel = RestoreChannel(self.q.channel.connection) 82 tag3 = uuid() 83 self.q.append(i + 3, tag3) 84 self.q.reject(tag3, requeue=True) 85 self.q._flush() 86 assert self.q._delivered 87 assert not self.q._delivered.restored 88 self.q.restore_unacked_once(stderr=stderr) 89 assert _restored == [11, 9, 8, 7, 6, 5, 4, 3, 2, 1] 90 assert self.q._delivered.restored 91 assert not self.q._delivered 92 93 self.q.restore_unacked_once(stderr=stderr) 94 self.q._delivered.restored = False 95 self.q.restore_unacked_once(stderr=stderr) 96 97 assert stderr.getvalue() 98 assert not stdouts.stdout.getvalue() 99 100 self.q.restore_at_shutdown = False 101 self.q.restore_unacked_once() 102 103 def test_get(self): 104 self.q._delivered['foo'] = 1 105 assert self.q.get('foo') == 1 106 107 108class test_Message: 109 110 def test_create(self): 111 c = client().channel() 112 data = c.prepare_message('the quick brown fox...') 113 tag = data['properties']['delivery_tag'] = uuid() 114 message = c.message_to_python(data) 115 assert isinstance(message, virtual.Message) 116 assert message is c.message_to_python(message) 117 if message.errors: 118 message._reraise_error() 119 120 assert message.body == 'the quick brown fox...'.encode('utf-8') 121 assert message.delivery_tag, tag 122 123 def test_create_no_body(self): 124 virtual.Message(channel=Mock(), payload={ 125 'body': None, 126 'properties': {'delivery_tag': 1}, 127 }) 128 129 def test_serializable(self): 130 c = client().channel() 131 body, content_type = compress('the quick brown fox...', 'gzip') 132 data = c.prepare_message(body, headers={'compression': content_type}) 133 tag = data['properties']['delivery_tag'] = uuid() 134 message = c.message_to_python(data) 135 dict_ = message.serializable() 136 assert dict_['body'] == 'the quick brown fox...'.encode('utf-8') 137 assert dict_['properties']['delivery_tag'] == tag 138 assert 'compression' not in dict_['headers'] 139 140 141class test_AbstractChannel: 142 143 def test_get(self): 144 with pytest.raises(NotImplementedError): 145 virtual.AbstractChannel()._get('queue') 146 147 def test_put(self): 148 with pytest.raises(NotImplementedError): 149 virtual.AbstractChannel()._put('queue', 'm') 150 151 def test_size(self): 152 assert virtual.AbstractChannel()._size('queue') == 0 153 154 def test_purge(self): 155 with pytest.raises(NotImplementedError): 156 virtual.AbstractChannel()._purge('queue') 157 158 def test_delete(self): 159 with pytest.raises(NotImplementedError): 160 virtual.AbstractChannel()._delete('queue') 161 162 def test_new_queue(self): 163 assert virtual.AbstractChannel()._new_queue('queue') is None 164 165 def test_has_queue(self): 166 assert virtual.AbstractChannel()._has_queue('queue') 167 168 def test_poll(self): 169 cycle = Mock(name='cycle') 170 assert virtual.AbstractChannel()._poll(cycle, Mock()) 171 cycle.get.assert_called() 172 173 174class test_Channel: 175 176 def setup(self): 177 self.channel = client().channel() 178 179 def teardown(self): 180 if self.channel._qos is not None: 181 self.channel._qos._on_collect.cancel() 182 183 def test_exceeds_channel_max(self): 184 c = client() 185 t = c.transport 186 avail = t._avail_channel_ids = Mock(name='_avail_channel_ids') 187 avail.pop.side_effect = IndexError() 188 with pytest.raises(ResourceError): 189 virtual.Channel(t) 190 191 def test_exchange_bind_interface(self): 192 with pytest.raises(NotImplementedError): 193 self.channel.exchange_bind('dest', 'src', 'key') 194 195 def test_exchange_unbind_interface(self): 196 with pytest.raises(NotImplementedError): 197 self.channel.exchange_unbind('dest', 'src', 'key') 198 199 def test_queue_unbind_interface(self): 200 self.channel.queue_unbind('dest', 'ex', 'key') 201 202 def test_management(self): 203 m = self.channel.connection.client.get_manager() 204 assert m 205 m.get_bindings() 206 m.close() 207 208 def test_exchange_declare(self): 209 c = self.channel 210 211 with pytest.raises(ChannelError): 212 c.exchange_declare('test_exchange_declare', 'direct', 213 durable=True, auto_delete=True, passive=True) 214 c.exchange_declare('test_exchange_declare', 'direct', 215 durable=True, auto_delete=True) 216 c.exchange_declare('test_exchange_declare', 'direct', 217 durable=True, auto_delete=True, passive=True) 218 assert 'test_exchange_declare' in c.state.exchanges 219 # can declare again with same values 220 c.exchange_declare('test_exchange_declare', 'direct', 221 durable=True, auto_delete=True) 222 assert 'test_exchange_declare' in c.state.exchanges 223 224 # using different values raises NotEquivalentError 225 with pytest.raises(virtual.NotEquivalentError): 226 c.exchange_declare('test_exchange_declare', 'direct', 227 durable=False, auto_delete=True) 228 229 def test_exchange_delete(self, ex='test_exchange_delete'): 230 231 class PurgeChannel(virtual.Channel): 232 purged = [] 233 234 def _purge(self, queue): 235 self.purged.append(queue) 236 237 c = PurgeChannel(self.channel.connection) 238 239 c.exchange_declare(ex, 'direct', durable=True, auto_delete=True) 240 assert ex in c.state.exchanges 241 assert not c.state.has_binding(ex, ex, ex) # no bindings yet 242 c.exchange_delete(ex) 243 assert ex not in c.state.exchanges 244 245 c.exchange_declare(ex, 'direct', durable=True, auto_delete=True) 246 c.queue_declare(ex) 247 c.queue_bind(ex, ex, ex) 248 assert c.state.has_binding(ex, ex, ex) 249 c.exchange_delete(ex) 250 assert not c.state.has_binding(ex, ex, ex) 251 assert ex in c.purged 252 253 def test_queue_delete__if_empty(self, n='test_queue_delete__if_empty'): 254 class PurgeChannel(virtual.Channel): 255 purged = [] 256 size = 30 257 258 def _purge(self, queue): 259 self.purged.append(queue) 260 261 def _size(self, queue): 262 return self.size 263 264 c = PurgeChannel(self.channel.connection) 265 c.exchange_declare(n) 266 c.queue_declare(n) 267 c.queue_bind(n, n, n) 268 # tests code path that returns if queue already bound. 269 c.queue_bind(n, n, n) 270 271 c.queue_delete(n, if_empty=True) 272 assert c.state.has_binding(n, n, n) 273 274 c.size = 0 275 c.queue_delete(n, if_empty=True) 276 assert not c.state.has_binding(n, n, n) 277 assert n in c.purged 278 279 def test_queue_purge(self, n='test_queue_purge'): 280 281 class PurgeChannel(virtual.Channel): 282 purged = [] 283 284 def _purge(self, queue): 285 self.purged.append(queue) 286 287 c = PurgeChannel(self.channel.connection) 288 c.exchange_declare(n) 289 c.queue_declare(n) 290 c.queue_bind(n, n, n) 291 c.queue_purge(n) 292 assert n in c.purged 293 294 def test_basic_publish__anon_exchange(self): 295 c = memory_client().channel() 296 msg = MagicMock(name='msg') 297 c.encode_body = Mock(name='c.encode_body') 298 c.encode_body.return_value = (1, 2) 299 c._put = Mock(name='c._put') 300 c.basic_publish(msg, None, 'rkey', kw=1) 301 c._put.assert_called_with('rkey', msg, kw=1) 302 303 def test_basic_publish_unique_delivery_tags(self, n='test_uniq_tag'): 304 c1 = memory_client().channel() 305 c2 = memory_client().channel() 306 307 for c in (c1, c2): 308 c.exchange_declare(n) 309 c.queue_declare(n) 310 c.queue_bind(n, n, n) 311 m1 = c1.prepare_message('George Costanza') 312 m2 = c2.prepare_message('Elaine Marie Benes') 313 c1.basic_publish(m1, n, n) 314 c2.basic_publish(m2, n, n) 315 316 r1 = c1.message_to_python(c1.basic_get(n)) 317 r2 = c2.message_to_python(c2.basic_get(n)) 318 319 assert r1.delivery_tag != r2.delivery_tag 320 with pytest.raises(ValueError): 321 int(r1.delivery_tag) 322 with pytest.raises(ValueError): 323 int(r2.delivery_tag) 324 325 def test_basic_publish__get__consume__restore(self, 326 n='test_basic_publish'): 327 c = memory_client().channel() 328 329 c.exchange_declare(n) 330 c.queue_declare(n) 331 c.queue_bind(n, n, n) 332 c.queue_declare(n + '2') 333 c.queue_bind(n + '2', n, n) 334 messages = [] 335 c.connection._deliver = Mock(name='_deliver') 336 337 def on_deliver(message, queue): 338 messages.append(message) 339 c.connection._deliver.side_effect = on_deliver 340 341 m = c.prepare_message('nthex quick brown fox...') 342 c.basic_publish(m, n, n) 343 344 r1 = c.message_to_python(c.basic_get(n)) 345 assert r1 346 assert r1.body == 'nthex quick brown fox...'.encode('utf-8') 347 assert c.basic_get(n) is None 348 349 consumer_tag = uuid() 350 351 c.basic_consume(n + '2', False, 352 consumer_tag=consumer_tag, callback=lambda *a: None) 353 assert n + '2' in c._active_queues 354 c.drain_events() 355 r2 = c.message_to_python(messages[-1]) 356 assert r2.body == 'nthex quick brown fox...'.encode('utf-8') 357 assert r2.delivery_info['exchange'] == n 358 assert r2.delivery_info['routing_key'] == n 359 with pytest.raises(virtual.Empty): 360 c.drain_events() 361 c.basic_cancel(consumer_tag) 362 363 c._restore(r2) 364 r3 = c.message_to_python(c.basic_get(n)) 365 assert r3 366 assert r3.body == 'nthex quick brown fox...'.encode('utf-8') 367 assert c.basic_get(n) is None 368 369 def test_basic_ack(self): 370 371 class MockQoS(virtual.QoS): 372 was_acked = False 373 374 def ack(self, delivery_tag): 375 self.was_acked = True 376 377 self.channel._qos = MockQoS(self.channel) 378 self.channel.basic_ack('foo') 379 assert self.channel._qos.was_acked 380 381 def test_basic_recover__requeue(self): 382 383 class MockQoS(virtual.QoS): 384 was_restored = False 385 386 def restore_unacked(self): 387 self.was_restored = True 388 389 self.channel._qos = MockQoS(self.channel) 390 self.channel.basic_recover(requeue=True) 391 assert self.channel._qos.was_restored 392 393 def test_restore_unacked_raises_BaseException(self): 394 q = self.channel.qos 395 q._flush = Mock() 396 q._delivered = {1: 1} 397 398 q.channel._restore = Mock() 399 q.channel._restore.side_effect = SystemExit 400 401 errors = q.restore_unacked() 402 assert isinstance(errors[0][0], SystemExit) 403 assert errors[0][1] == 1 404 assert not q._delivered 405 406 @patch('kombu.transport.virtual.base.emergency_dump_state') 407 @patch(PRINT_FQDN) 408 def test_restore_unacked_once_when_unrestored(self, print_, 409 emergency_dump_state): 410 q = self.channel.qos 411 q._flush = Mock() 412 413 class State(dict): 414 restored = False 415 416 q._delivered = State({1: 1}) 417 ru = q.restore_unacked = Mock() 418 exc = None 419 try: 420 raise KeyError() 421 except KeyError as exc_: 422 exc = exc_ 423 ru.return_value = [(exc, 1)] 424 425 self.channel.do_restore = True 426 q.restore_unacked_once() 427 print_.assert_called() 428 emergency_dump_state.assert_called() 429 430 def test_basic_recover(self): 431 with pytest.raises(NotImplementedError): 432 self.channel.basic_recover(requeue=False) 433 434 def test_basic_reject(self): 435 436 class MockQoS(virtual.QoS): 437 was_rejected = False 438 439 def reject(self, delivery_tag, requeue=False): 440 self.was_rejected = True 441 442 self.channel._qos = MockQoS(self.channel) 443 self.channel.basic_reject('foo') 444 assert self.channel._qos.was_rejected 445 446 def test_basic_qos(self): 447 self.channel.basic_qos(prefetch_count=128) 448 assert self.channel._qos.prefetch_count == 128 449 450 def test_lookup__undeliverable(self, n='test_lookup__undeliverable'): 451 warnings.resetwarnings() 452 with warnings.catch_warnings(record=True) as log: 453 assert self.channel._lookup(n, n, 'ae.undeliver') == [ 454 'ae.undeliver', 455 ] 456 assert log 457 assert 'could not be delivered' in log[0].message.args[0] 458 459 def test_context(self): 460 x = self.channel.__enter__() 461 assert x is self.channel 462 x.__exit__() 463 assert x.closed 464 465 def test_cycle_property(self): 466 assert self.channel.cycle 467 468 def test_flow(self): 469 with pytest.raises(NotImplementedError): 470 self.channel.flow(False) 471 472 def test_close_when_no_connection(self): 473 self.channel.connection = None 474 self.channel.close() 475 assert self.channel.closed 476 477 def test_drain_events_has_get_many(self): 478 c = self.channel 479 c._get_many = Mock() 480 c._poll = Mock() 481 c._consumers = [1] 482 c._qos = Mock() 483 c._qos.can_consume.return_value = True 484 485 c.drain_events(timeout=10.0) 486 c._get_many.assert_called_with(c._active_queues, timeout=10.0) 487 488 def test_get_exchanges(self): 489 self.channel.exchange_declare(exchange='unique_name') 490 assert self.channel.get_exchanges() 491 492 def test_basic_cancel_not_in_active_queues(self): 493 c = self.channel 494 c._consumers.add('x') 495 c._tag_to_queue['x'] = 'foo' 496 c._active_queues = Mock() 497 c._active_queues.remove.side_effect = ValueError() 498 499 c.basic_cancel('x') 500 c._active_queues.remove.assert_called_with('foo') 501 502 def test_basic_cancel_unknown_ctag(self): 503 assert self.channel.basic_cancel('unknown-tag') is None 504 505 def test_list_bindings(self): 506 c = self.channel 507 c.exchange_declare(exchange='unique_name') 508 c.queue_declare(queue='q') 509 c.queue_bind(queue='q', exchange='unique_name', routing_key='rk') 510 511 assert ('q', 'unique_name', 'rk') in list(c.list_bindings()) 512 513 def test_after_reply_message_received(self): 514 c = self.channel 515 c.queue_delete = Mock() 516 c.after_reply_message_received('foo') 517 c.queue_delete.assert_called_with('foo') 518 519 def test_queue_delete_unknown_queue(self): 520 assert self.channel.queue_delete('xiwjqjwel') is None 521 522 def test_queue_declare_passive(self): 523 has_queue = self.channel._has_queue = Mock() 524 has_queue.return_value = False 525 with pytest.raises(ChannelError): 526 self.channel.queue_declare(queue='21wisdjwqe', passive=True) 527 528 def test_get_message_priority(self): 529 530 def _message(priority): 531 return self.channel.prepare_message( 532 'the message with priority', priority=priority, 533 ) 534 535 assert self.channel._get_message_priority(_message(5)) == 5 536 assert self.channel._get_message_priority( 537 _message(self.channel.min_priority - 10) 538 ) == self.channel.min_priority 539 assert self.channel._get_message_priority( 540 _message(self.channel.max_priority + 10), 541 ) == self.channel.max_priority 542 assert self.channel._get_message_priority( 543 _message('foobar'), 544 ) == self.channel.default_priority 545 assert self.channel._get_message_priority( 546 _message(2), reverse=True, 547 ) == self.channel.max_priority - 2 548 549 550class test_Transport: 551 552 def setup(self): 553 self.transport = client().transport 554 555 def test_custom_polling_interval(self): 556 x = client(transport_options={'polling_interval': 32.3}) 557 assert x.transport.polling_interval == 32.3 558 559 def test_timeout_over_polling_interval(self): 560 x = client(transport_options=dict(polling_interval=60)) 561 start = monotonic() 562 with pytest.raises(socket.timeout): 563 x.transport.drain_events(x, timeout=.5) 564 assert monotonic() - start < 60 565 566 def test_close_connection(self): 567 c1 = self.transport.create_channel(self.transport) 568 c2 = self.transport.create_channel(self.transport) 569 assert len(self.transport.channels) == 2 570 self.transport.close_connection(self.transport) 571 assert not self.transport.channels 572 del(c1) # so pyflakes doesn't complain 573 del(c2) 574 575 def test_drain_channel(self): 576 channel = self.transport.create_channel(self.transport) 577 with pytest.raises(virtual.Empty): 578 self.transport._drain_channel(channel, Mock()) 579 580 def test__deliver__no_queue(self): 581 with pytest.raises(KeyError): 582 self.transport._deliver(Mock(name='msg'), queue=None) 583 584 def test__reject_inbound_message(self): 585 channel = Mock(name='channel') 586 self.transport.channels = [None, channel] 587 self.transport._reject_inbound_message({'foo': 'bar'}) 588 channel.Message.assert_called_with({'foo': 'bar'}, channel=channel) 589 channel.qos.append.assert_called_with( 590 channel.Message(), channel.Message().delivery_tag, 591 ) 592 channel.basic_reject.assert_called_with( 593 channel.Message().delivery_tag, requeue=True, 594 ) 595 596 def test_on_message_ready(self): 597 channel = Mock(name='channel') 598 msg = Mock(name='msg') 599 callback = Mock(name='callback') 600 self.transport._callbacks = {'q1': callback} 601 self.transport.on_message_ready(channel, msg, queue='q1') 602 callback.assert_called_with(msg) 603 604 def test_on_message_ready__no_queue(self): 605 with pytest.raises(KeyError): 606 self.transport.on_message_ready( 607 Mock(name='channel'), Mock(name='msg'), queue=None) 608 609 def test_on_message_ready__no_callback(self): 610 self.transport._callbacks = {} 611 with pytest.raises(KeyError): 612 self.transport.on_message_ready( 613 Mock(name='channel'), Mock(name='msg'), queue='q1') 614