1# 2# Licensed to the Apache Software Foundation (ASF) under one 3# or more contributor license agreements. See the NOTICE file 4# distributed with this work for additional information 5# regarding copyright ownership. The ASF licenses this file 6# to you under the Apache License, Version 2.0 (the 7# "License"); you may not use this file except in compliance 8# with the License. You may obtain a copy of the License at 9# 10# http://www.apache.org/licenses/LICENSE-2.0 11# 12# Unless required by applicable law or agreed to in writing, 13# software distributed under the License is distributed on an 14# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 15# KIND, either express or implied. See the License for the 16# specific language governing permissions and limitations 17# under the License. 18# 19from twisted.internet.defer import inlineCallbacks 20 21from txamqp.client import ConnectionClosed 22from txamqp.queue import Empty 23from txamqp.content import Content 24from txamqp.testlib import TestBase, SupportedBrokers, QPID, OPENAMQ 25 26 27class ASaslPlainAuthenticationTest(TestBase): 28 """Test for SASL PLAIN authentication Broker functionality""" 29 30 @inlineCallbacks 31 def authenticate(self, client, user, password): 32 yield client.authenticate(user, password, mechanism='PLAIN') 33 34 @inlineCallbacks 35 def test_sasl_plain(self): 36 channel = yield self.client.channel(200) 37 yield channel.channel_open() 38 yield channel.channel_close() 39 40 41class ASaslAmqPlainAuthenticationTest(TestBase): 42 """Test for SASL AMQPLAIN authentication Broker functionality""" 43 44 @inlineCallbacks 45 def authenticate(self, client, user, password): 46 yield client.authenticate(user, password, mechanism='AMQPLAIN') 47 48 @inlineCallbacks 49 def test_sasl_amq_plain(self): 50 channel = yield self.client.channel(200) 51 yield channel.channel_open() 52 yield channel.channel_close() 53 54 55class BrokerTests(TestBase): 56 """Tests for basic Broker functionality""" 57 58 @inlineCallbacks 59 def test_amqp_basic_13(self): 60 """ 61 First, this test tries to receive a message with a no-ack 62 consumer. Second, this test tries to explicitely receive and 63 acknowledge a message with an acknowledging consumer. 64 """ 65 ch = self.channel 66 yield self.queue_declare(ch, queue="myqueue") 67 68 # No ack consumer 69 ctag = (yield ch.basic_consume(queue="myqueue", no_ack=True)).consumer_tag 70 body = "test no-ack" 71 ch.basic_publish(routing_key="myqueue", content=Content(body)) 72 msg = yield ((yield self.client.queue(ctag)).get(timeout=5)) 73 self.assert_(msg.content.body == body) 74 75 # Acknowleding consumer 76 yield self.queue_declare(ch, queue="otherqueue") 77 ctag = (yield ch.basic_consume(queue="otherqueue", no_ack=False)).consumer_tag 78 body = "test ack" 79 ch.basic_publish(routing_key="otherqueue", content=Content(body)) 80 msg = yield ((yield self.client.queue(ctag)).get(timeout=5)) 81 ch.basic_ack(delivery_tag=msg.delivery_tag) 82 self.assert_(msg.content.body == body) 83 84 @inlineCallbacks 85 def test_basic_delivery_immediate(self): 86 """ 87 Test basic message delivery where consume is issued before publish. 88 89 Will be skipped for RabbitMQ 3.0 or higher, since support for the 90 'immediate' flag was removed, see: 91 92 http://www.rabbitmq.com/blog/2012/11/19/breaking-things-with-rabbitmq-3-0 93 """ 94 server_properties = self.client.delegate.server_properties 95 if server_properties["product"] == "RabbitMQ": 96 version = tuple(map(int, server_properties["version"].split("."))) 97 if version >= (3, 0, 0): 98 self.skipTest("Not supported for this broker.") 99 100 channel = self.channel 101 yield self.exchange_declare(channel, exchange="test-exchange", type="direct") 102 yield self.queue_declare(channel, queue="test-queue") 103 yield channel.queue_bind(queue="test-queue", exchange="test-exchange", routing_key="key") 104 reply = yield channel.basic_consume(queue="test-queue", no_ack=True) 105 queue = yield self.client.queue(reply.consumer_tag) 106 107 body = "Immediate Delivery" 108 channel.basic_publish(exchange="test-exchange", routing_key="key", content=Content(body), immediate=True) 109 msg = yield queue.get(timeout=5) 110 self.assert_(msg.content.body == body) 111 112 # TODO: Ensure we fail if immediate=True and there's no consumer. 113 114 @inlineCallbacks 115 def test_basic_delivery_queued(self): 116 """ 117 Test basic message delivery where publish is issued before consume 118 (i.e. requires queueing of the message) 119 """ 120 channel = self.channel 121 yield self.exchange_declare(channel, exchange="test-exchange", type="direct") 122 yield self.queue_declare(channel, queue="test-queue") 123 yield channel.queue_bind(queue="test-queue", exchange="test-exchange", routing_key="key") 124 body = "Queued Delivery" 125 channel.basic_publish(exchange="test-exchange", routing_key="key", content=Content(body)) 126 reply = yield channel.basic_consume(queue="test-queue", no_ack=True) 127 queue = yield self.client.queue(reply.consumer_tag) 128 msg = yield queue.get(timeout=5) 129 self.assert_(msg.content.body == body) 130 131 @inlineCallbacks 132 def test_invalid_channel(self): 133 channel = yield self.client.channel(200) 134 try: 135 yield channel.queue_declare(exclusive=True) 136 self.fail("Expected error on queue_declare for invalid channel") 137 except ConnectionClosed as e: 138 self.assertConnectionException(504, e.args[0]) 139 140 @inlineCallbacks 141 def test_closed_channel(self): 142 channel = yield self.client.channel(200) 143 yield channel.channel_open() 144 yield channel.channel_close() 145 try: 146 yield channel.queue_declare(exclusive=True) 147 self.fail("Expected error on queue_declare for closed channel") 148 except ConnectionClosed as e: 149 self.assertConnectionException(504, e.args[0]) 150 151 @SupportedBrokers(QPID, OPENAMQ) 152 @inlineCallbacks 153 def test_channel_flow(self): 154 channel = self.channel 155 yield channel.queue_declare(queue="flow_test_queue", exclusive=True) 156 yield channel.basic_consume(consumer_tag="my-tag", queue="flow_test_queue") 157 incoming = yield self.client.queue("my-tag") 158 159 yield channel.channel_flow(active=False) 160 channel.basic_publish(routing_key="flow_test_queue", content=Content("abcdefghijklmnopqrstuvwxyz")) 161 try: 162 yield incoming.get(timeout=1) 163 self.fail("Received message when flow turned off.") 164 except Empty: 165 pass 166 167 yield channel.channel_flow(active=True) 168 msg = yield incoming.get(timeout=1) 169 self.assertEqual("abcdefghijklmnopqrstuvwxyz", msg.content.body) 170 171 @inlineCallbacks 172 def test_close_cleanly(self): 173 """ 174 Test closing a client cleanly, by sending 'close' and waiting for 175 'close-ok'. 176 """ 177 yield self.client.close(within=5) 178 yield self.client.disconnected.wait() 179 self.assertTrue(self.client.closed) 180