1# -*- coding: utf-8 -*-
2# pylint: disable=C0111,C0103,R0205
3
4import functools
5import logging
6import time
7import pika
8
9LOG_FORMAT = ('%(levelname) -10s %(asctime)s %(name) -30s %(funcName) '
10              '-35s %(lineno) -5d: %(message)s')
11LOGGER = logging.getLogger(__name__)
12
13
14class ExampleConsumer(object):
15    """This is an example consumer that will handle unexpected interactions
16    with RabbitMQ such as channel and connection closures.
17
18    If RabbitMQ closes the connection, this class will stop and indicate
19    that reconnection is necessary. You should look at the output, as
20    there are limited reasons why the connection may be closed, which
21    usually are tied to permission related issues or socket timeouts.
22
23    If the channel is closed, it will indicate a problem with one of the
24    commands that were issued and that should surface in the output as well.
25
26    """
27    EXCHANGE = 'message'
28    EXCHANGE_TYPE = 'topic'
29    QUEUE = 'text'
30    ROUTING_KEY = 'example.text'
31
32    def __init__(self, amqp_url):
33        """Create a new instance of the consumer class, passing in the AMQP
34        URL used to connect to RabbitMQ.
35
36        :param str amqp_url: The AMQP url to connect with
37
38        """
39        self.should_reconnect = False
40        self.was_consuming = False
41
42        self._connection = None
43        self._channel = None
44        self._closing = False
45        self._consumer_tag = None
46        self._url = amqp_url
47        self._consuming = False
48        # In production, experiment with higher prefetch values
49        # for higher consumer throughput
50        self._prefetch_count = 1
51
52    def connect(self):
53        """This method connects to RabbitMQ, returning the connection handle.
54        When the connection is established, the on_connection_open method
55        will be invoked by pika.
56
57        :rtype: pika.SelectConnection
58
59        """
60        LOGGER.info('Connecting to %s', self._url)
61        return pika.SelectConnection(
62            parameters=pika.URLParameters(self._url),
63            on_open_callback=self.on_connection_open,
64            on_open_error_callback=self.on_connection_open_error,
65            on_close_callback=self.on_connection_closed)
66
67    def close_connection(self):
68        self._consuming = False
69        if self._connection.is_closing or self._connection.is_closed:
70            LOGGER.info('Connection is closing or already closed')
71        else:
72            LOGGER.info('Closing connection')
73            self._connection.close()
74
75    def on_connection_open(self, _unused_connection):
76        """This method is called by pika once the connection to RabbitMQ has
77        been established. It passes the handle to the connection object in
78        case we need it, but in this case, we'll just mark it unused.
79
80        :param pika.SelectConnection _unused_connection: The connection
81
82        """
83        LOGGER.info('Connection opened')
84        self.open_channel()
85
86    def on_connection_open_error(self, _unused_connection, err):
87        """This method is called by pika if the connection to RabbitMQ
88        can't be established.
89
90        :param pika.SelectConnection _unused_connection: The connection
91        :param Exception err: The error
92
93        """
94        LOGGER.error('Connection open failed: %s', err)
95        self.reconnect()
96
97    def on_connection_closed(self, _unused_connection, reason):
98        """This method is invoked by pika when the connection to RabbitMQ is
99        closed unexpectedly. Since it is unexpected, we will reconnect to
100        RabbitMQ if it disconnects.
101
102        :param pika.connection.Connection connection: The closed connection obj
103        :param Exception reason: exception representing reason for loss of
104            connection.
105
106        """
107        self._channel = None
108        if self._closing:
109            self._connection.ioloop.stop()
110        else:
111            LOGGER.warning('Connection closed, reconnect necessary: %s', reason)
112            self.reconnect()
113
114    def reconnect(self):
115        """Will be invoked if the connection can't be opened or is
116        closed. Indicates that a reconnect is necessary then stops the
117        ioloop.
118
119        """
120        self.should_reconnect = True
121        self.stop()
122
123    def open_channel(self):
124        """Open a new channel with RabbitMQ by issuing the Channel.Open RPC
125        command. When RabbitMQ responds that the channel is open, the
126        on_channel_open callback will be invoked by pika.
127
128        """
129        LOGGER.info('Creating a new channel')
130        self._connection.channel(on_open_callback=self.on_channel_open)
131
132    def on_channel_open(self, channel):
133        """This method is invoked by pika when the channel has been opened.
134        The channel object is passed in so we can make use of it.
135
136        Since the channel is now open, we'll declare the exchange to use.
137
138        :param pika.channel.Channel channel: The channel object
139
140        """
141        LOGGER.info('Channel opened')
142        self._channel = channel
143        self.add_on_channel_close_callback()
144        self.setup_exchange(self.EXCHANGE)
145
146    def add_on_channel_close_callback(self):
147        """This method tells pika to call the on_channel_closed method if
148        RabbitMQ unexpectedly closes the channel.
149
150        """
151        LOGGER.info('Adding channel close callback')
152        self._channel.add_on_close_callback(self.on_channel_closed)
153
154    def on_channel_closed(self, channel, reason):
155        """Invoked by pika when RabbitMQ unexpectedly closes the channel.
156        Channels are usually closed if you attempt to do something that
157        violates the protocol, such as re-declare an exchange or queue with
158        different parameters. In this case, we'll close the connection
159        to shutdown the object.
160
161        :param pika.channel.Channel: The closed channel
162        :param Exception reason: why the channel was closed
163
164        """
165        LOGGER.warning('Channel %i was closed: %s', channel, reason)
166        self.close_connection()
167
168    def setup_exchange(self, exchange_name):
169        """Setup the exchange on RabbitMQ by invoking the Exchange.Declare RPC
170        command. When it is complete, the on_exchange_declareok method will
171        be invoked by pika.
172
173        :param str|unicode exchange_name: The name of the exchange to declare
174
175        """
176        LOGGER.info('Declaring exchange: %s', exchange_name)
177        # Note: using functools.partial is not required, it is demonstrating
178        # how arbitrary data can be passed to the callback when it is called
179        cb = functools.partial(
180            self.on_exchange_declareok, userdata=exchange_name)
181        self._channel.exchange_declare(
182            exchange=exchange_name,
183            exchange_type=self.EXCHANGE_TYPE,
184            callback=cb)
185
186    def on_exchange_declareok(self, _unused_frame, userdata):
187        """Invoked by pika when RabbitMQ has finished the Exchange.Declare RPC
188        command.
189
190        :param pika.Frame.Method unused_frame: Exchange.DeclareOk response frame
191        :param str|unicode userdata: Extra user data (exchange name)
192
193        """
194        LOGGER.info('Exchange declared: %s', userdata)
195        self.setup_queue(self.QUEUE)
196
197    def setup_queue(self, queue_name):
198        """Setup the queue on RabbitMQ by invoking the Queue.Declare RPC
199        command. When it is complete, the on_queue_declareok method will
200        be invoked by pika.
201
202        :param str|unicode queue_name: The name of the queue to declare.
203
204        """
205        LOGGER.info('Declaring queue %s', queue_name)
206        cb = functools.partial(self.on_queue_declareok, userdata=queue_name)
207        self._channel.queue_declare(queue=queue_name, callback=cb)
208
209    def on_queue_declareok(self, _unused_frame, userdata):
210        """Method invoked by pika when the Queue.Declare RPC call made in
211        setup_queue has completed. In this method we will bind the queue
212        and exchange together with the routing key by issuing the Queue.Bind
213        RPC command. When this command is complete, the on_bindok method will
214        be invoked by pika.
215
216        :param pika.frame.Method _unused_frame: The Queue.DeclareOk frame
217        :param str|unicode userdata: Extra user data (queue name)
218
219        """
220        queue_name = userdata
221        LOGGER.info('Binding %s to %s with %s', self.EXCHANGE, queue_name,
222                    self.ROUTING_KEY)
223        cb = functools.partial(self.on_bindok, userdata=queue_name)
224        self._channel.queue_bind(
225            queue_name,
226            self.EXCHANGE,
227            routing_key=self.ROUTING_KEY,
228            callback=cb)
229
230    def on_bindok(self, _unused_frame, userdata):
231        """Invoked by pika when the Queue.Bind method has completed. At this
232        point we will set the prefetch count for the channel.
233
234        :param pika.frame.Method _unused_frame: The Queue.BindOk response frame
235        :param str|unicode userdata: Extra user data (queue name)
236
237        """
238        LOGGER.info('Queue bound: %s', userdata)
239        self.set_qos()
240
241    def set_qos(self):
242        """This method sets up the consumer prefetch to only be delivered
243        one message at a time. The consumer must acknowledge this message
244        before RabbitMQ will deliver another one. You should experiment
245        with different prefetch values to achieve desired performance.
246
247        """
248        self._channel.basic_qos(
249            prefetch_count=self._prefetch_count, callback=self.on_basic_qos_ok)
250
251    def on_basic_qos_ok(self, _unused_frame):
252        """Invoked by pika when the Basic.QoS method has completed. At this
253        point we will start consuming messages by calling start_consuming
254        which will invoke the needed RPC commands to start the process.
255
256        :param pika.frame.Method _unused_frame: The Basic.QosOk response frame
257
258        """
259        LOGGER.info('QOS set to: %d', self._prefetch_count)
260        self.start_consuming()
261
262    def start_consuming(self):
263        """This method sets up the consumer by first calling
264        add_on_cancel_callback so that the object is notified if RabbitMQ
265        cancels the consumer. It then issues the Basic.Consume RPC command
266        which returns the consumer tag that is used to uniquely identify the
267        consumer with RabbitMQ. We keep the value to use it when we want to
268        cancel consuming. The on_message method is passed in as a callback pika
269        will invoke when a message is fully received.
270
271        """
272        LOGGER.info('Issuing consumer related RPC commands')
273        self.add_on_cancel_callback()
274        self._consumer_tag = self._channel.basic_consume(
275            self.QUEUE, self.on_message)
276        self.was_consuming = True
277        self._consuming = True
278
279    def add_on_cancel_callback(self):
280        """Add a callback that will be invoked if RabbitMQ cancels the consumer
281        for some reason. If RabbitMQ does cancel the consumer,
282        on_consumer_cancelled will be invoked by pika.
283
284        """
285        LOGGER.info('Adding consumer cancellation callback')
286        self._channel.add_on_cancel_callback(self.on_consumer_cancelled)
287
288    def on_consumer_cancelled(self, method_frame):
289        """Invoked by pika when RabbitMQ sends a Basic.Cancel for a consumer
290        receiving messages.
291
292        :param pika.frame.Method method_frame: The Basic.Cancel frame
293
294        """
295        LOGGER.info('Consumer was cancelled remotely, shutting down: %r',
296                    method_frame)
297        if self._channel:
298            self._channel.close()
299
300    def on_message(self, _unused_channel, basic_deliver, properties, body):
301        """Invoked by pika when a message is delivered from RabbitMQ. The
302        channel is passed for your convenience. The basic_deliver object that
303        is passed in carries the exchange, routing key, delivery tag and
304        a redelivered flag for the message. The properties passed in is an
305        instance of BasicProperties with the message properties and the body
306        is the message that was sent.
307
308        :param pika.channel.Channel _unused_channel: The channel object
309        :param pika.Spec.Basic.Deliver: basic_deliver method
310        :param pika.Spec.BasicProperties: properties
311        :param bytes body: The message body
312
313        """
314        LOGGER.info('Received message # %s from %s: %s',
315                    basic_deliver.delivery_tag, properties.app_id, body)
316        self.acknowledge_message(basic_deliver.delivery_tag)
317
318    def acknowledge_message(self, delivery_tag):
319        """Acknowledge the message delivery from RabbitMQ by sending a
320        Basic.Ack RPC method for the delivery tag.
321
322        :param int delivery_tag: The delivery tag from the Basic.Deliver frame
323
324        """
325        LOGGER.info('Acknowledging message %s', delivery_tag)
326        self._channel.basic_ack(delivery_tag)
327
328    def stop_consuming(self):
329        """Tell RabbitMQ that you would like to stop consuming by sending the
330        Basic.Cancel RPC command.
331
332        """
333        if self._channel:
334            LOGGER.info('Sending a Basic.Cancel RPC command to RabbitMQ')
335            cb = functools.partial(
336                self.on_cancelok, userdata=self._consumer_tag)
337            self._channel.basic_cancel(self._consumer_tag, cb)
338
339    def on_cancelok(self, _unused_frame, userdata):
340        """This method is invoked by pika when RabbitMQ acknowledges the
341        cancellation of a consumer. At this point we will close the channel.
342        This will invoke the on_channel_closed method once the channel has been
343        closed, which will in-turn close the connection.
344
345        :param pika.frame.Method _unused_frame: The Basic.CancelOk frame
346        :param str|unicode userdata: Extra user data (consumer tag)
347
348        """
349        self._consuming = False
350        LOGGER.info(
351            'RabbitMQ acknowledged the cancellation of the consumer: %s',
352            userdata)
353        self.close_channel()
354
355    def close_channel(self):
356        """Call to close the channel with RabbitMQ cleanly by issuing the
357        Channel.Close RPC command.
358
359        """
360        LOGGER.info('Closing the channel')
361        self._channel.close()
362
363    def run(self):
364        """Run the example consumer by connecting to RabbitMQ and then
365        starting the IOLoop to block and allow the SelectConnection to operate.
366
367        """
368        self._connection = self.connect()
369        self._connection.ioloop.start()
370
371    def stop(self):
372        """Cleanly shutdown the connection to RabbitMQ by stopping the consumer
373        with RabbitMQ. When RabbitMQ confirms the cancellation, on_cancelok
374        will be invoked by pika, which will then closing the channel and
375        connection. The IOLoop is started again because this method is invoked
376        when CTRL-C is pressed raising a KeyboardInterrupt exception. This
377        exception stops the IOLoop which needs to be running for pika to
378        communicate with RabbitMQ. All of the commands issued prior to starting
379        the IOLoop will be buffered but not processed.
380
381        """
382        if not self._closing:
383            self._closing = True
384            LOGGER.info('Stopping')
385            if self._consuming:
386                self.stop_consuming()
387                self._connection.ioloop.start()
388            else:
389                self._connection.ioloop.stop()
390            LOGGER.info('Stopped')
391
392
393class ReconnectingExampleConsumer(object):
394    """This is an example consumer that will reconnect if the nested
395    ExampleConsumer indicates that a reconnect is necessary.
396
397    """
398
399    def __init__(self, amqp_url):
400        self._reconnect_delay = 0
401        self._amqp_url = amqp_url
402        self._consumer = ExampleConsumer(self._amqp_url)
403
404    def run(self):
405        while True:
406            try:
407                self._consumer.run()
408            except KeyboardInterrupt:
409                self._consumer.stop()
410                break
411            self._maybe_reconnect()
412
413    def _maybe_reconnect(self):
414        if self._consumer.should_reconnect:
415            self._consumer.stop()
416            reconnect_delay = self._get_reconnect_delay()
417            LOGGER.info('Reconnecting after %d seconds', reconnect_delay)
418            time.sleep(reconnect_delay)
419            self._consumer = ExampleConsumer(self._amqp_url)
420
421    def _get_reconnect_delay(self):
422        if self._consumer.was_consuming:
423            self._reconnect_delay = 0
424        else:
425            self._reconnect_delay += 1
426        if self._reconnect_delay > 30:
427            self._reconnect_delay = 30
428        return self._reconnect_delay
429
430
431def main():
432    logging.basicConfig(level=logging.DEBUG, format=LOG_FORMAT)
433    amqp_url = 'amqp://guest:guest@localhost:5672/%2F'
434    consumer = ReconnectingExampleConsumer(amqp_url)
435    consumer.run()
436
437
438if __name__ == '__main__':
439    main()
440