1#
2# Licensed to the Apache Software Foundation (ASF) under one
3# or more contributor license agreements.  See the NOTICE file
4# distributed with this work for additional information
5# regarding copyright ownership.  The ASF licenses this file
6# to you under the Apache License, Version 2.0 (the
7# "License"); you may not use this file except in compliance
8# with the License.  You may obtain a copy of the License at
9#
10#   http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing,
13# software distributed under the License is distributed on an
14# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15# KIND, either express or implied.  See the License for the
16# specific language governing permissions and limitations
17# under the License.
18#
19from twisted.internet.defer import inlineCallbacks
20
21from txamqp.client import ConnectionClosed
22from txamqp.queue import Empty
23from txamqp.content import Content
24from txamqp.testlib import TestBase, SupportedBrokers, QPID, OPENAMQ
25
26
27class ASaslPlainAuthenticationTest(TestBase):
28    """Test for SASL PLAIN authentication Broker functionality"""
29
30    @inlineCallbacks
31    def authenticate(self, client, user, password):
32        yield client.authenticate(user, password, mechanism='PLAIN')
33
34    @inlineCallbacks
35    def test_sasl_plain(self):
36        channel = yield self.client.channel(200)
37        yield channel.channel_open()
38        yield channel.channel_close()
39
40
41class ASaslAmqPlainAuthenticationTest(TestBase):
42    """Test for SASL AMQPLAIN authentication Broker functionality"""
43
44    @inlineCallbacks
45    def authenticate(self, client, user, password):
46        yield client.authenticate(user, password, mechanism='AMQPLAIN')
47
48    @inlineCallbacks
49    def test_sasl_amq_plain(self):
50        channel = yield self.client.channel(200)
51        yield channel.channel_open()
52        yield channel.channel_close()
53
54
55class BrokerTests(TestBase):
56    """Tests for basic Broker functionality"""
57
58    @inlineCallbacks
59    def test_amqp_basic_13(self):
60        """
61        First, this test tries to receive a message with a no-ack
62        consumer. Second, this test tries to explicitely receive and
63        acknowledge a message with an acknowledging consumer.
64        """
65        ch = self.channel
66        yield self.queue_declare(ch, queue="myqueue")
67
68        # No ack consumer
69        ctag = (yield ch.basic_consume(queue="myqueue", no_ack=True)).consumer_tag
70        body = "test no-ack"
71        ch.basic_publish(routing_key="myqueue", content=Content(body))
72        msg = yield ((yield self.client.queue(ctag)).get(timeout=5))
73        self.assert_(msg.content.body == body)
74
75        # Acknowleding consumer
76        yield self.queue_declare(ch, queue="otherqueue")
77        ctag = (yield ch.basic_consume(queue="otherqueue", no_ack=False)).consumer_tag
78        body = "test ack"
79        ch.basic_publish(routing_key="otherqueue", content=Content(body))
80        msg = yield ((yield self.client.queue(ctag)).get(timeout=5))
81        ch.basic_ack(delivery_tag=msg.delivery_tag)
82        self.assert_(msg.content.body == body)
83
84    @inlineCallbacks
85    def test_basic_delivery_immediate(self):
86        """
87        Test basic message delivery where consume is issued before publish.
88
89        Will be skipped for RabbitMQ 3.0 or higher, since support for the
90        'immediate' flag was removed, see:
91
92        http://www.rabbitmq.com/blog/2012/11/19/breaking-things-with-rabbitmq-3-0
93        """
94        server_properties = self.client.delegate.server_properties
95        if server_properties["product"] == "RabbitMQ":
96            version = tuple(map(int, server_properties["version"].split(".")))
97            if version >= (3, 0, 0):
98                self.skipTest("Not supported for this broker.")
99
100        channel = self.channel
101        yield self.exchange_declare(channel, exchange="test-exchange", type="direct")
102        yield self.queue_declare(channel, queue="test-queue")
103        yield channel.queue_bind(queue="test-queue", exchange="test-exchange", routing_key="key")
104        reply = yield channel.basic_consume(queue="test-queue", no_ack=True)
105        queue = yield self.client.queue(reply.consumer_tag)
106
107        body = "Immediate Delivery"
108        channel.basic_publish(exchange="test-exchange", routing_key="key", content=Content(body), immediate=True)
109        msg = yield queue.get(timeout=5)
110        self.assert_(msg.content.body == body)
111
112        # TODO: Ensure we fail if immediate=True and there's no consumer.
113
114    @inlineCallbacks
115    def test_basic_delivery_queued(self):
116        """
117        Test basic message delivery where publish is issued before consume
118        (i.e. requires queueing of the message)
119        """
120        channel = self.channel
121        yield self.exchange_declare(channel, exchange="test-exchange", type="direct")
122        yield self.queue_declare(channel, queue="test-queue")
123        yield channel.queue_bind(queue="test-queue", exchange="test-exchange", routing_key="key")
124        body = "Queued Delivery"
125        channel.basic_publish(exchange="test-exchange", routing_key="key", content=Content(body))
126        reply = yield channel.basic_consume(queue="test-queue", no_ack=True)
127        queue = yield self.client.queue(reply.consumer_tag)
128        msg = yield queue.get(timeout=5)
129        self.assert_(msg.content.body == body)
130
131    @inlineCallbacks
132    def test_invalid_channel(self):
133        channel = yield self.client.channel(200)
134        try:
135            yield channel.queue_declare(exclusive=True)
136            self.fail("Expected error on queue_declare for invalid channel")
137        except ConnectionClosed as e:
138            self.assertConnectionException(504, e.args[0])
139
140    @inlineCallbacks
141    def test_closed_channel(self):
142        channel = yield self.client.channel(200)
143        yield channel.channel_open()
144        yield channel.channel_close()
145        try:
146            yield channel.queue_declare(exclusive=True)
147            self.fail("Expected error on queue_declare for closed channel")
148        except ConnectionClosed as e:
149            self.assertConnectionException(504, e.args[0])
150
151    @SupportedBrokers(QPID, OPENAMQ)
152    @inlineCallbacks
153    def test_channel_flow(self):
154        channel = self.channel
155        yield channel.queue_declare(queue="flow_test_queue", exclusive=True)
156        yield channel.basic_consume(consumer_tag="my-tag", queue="flow_test_queue")
157        incoming = yield self.client.queue("my-tag")
158
159        yield channel.channel_flow(active=False)
160        channel.basic_publish(routing_key="flow_test_queue", content=Content("abcdefghijklmnopqrstuvwxyz"))
161        try:
162            yield incoming.get(timeout=1)
163            self.fail("Received message when flow turned off.")
164        except Empty:
165            pass
166
167        yield channel.channel_flow(active=True)
168        msg = yield incoming.get(timeout=1)
169        self.assertEqual("abcdefghijklmnopqrstuvwxyz", msg.content.body)
170
171    @inlineCallbacks
172    def test_close_cleanly(self):
173        """
174        Test closing a client cleanly, by sending 'close' and waiting for
175        'close-ok'.
176        """
177        yield self.client.close(within=5)
178        yield self.client.disconnected.wait()
179        self.assertTrue(self.client.closed)
180