1# -*- coding: utf-8 -*-
2# pylint: disable=C0111,C0103,R0205
3
4import functools
5import logging
6import json
7import pika
8
9LOG_FORMAT = ('%(levelname) -10s %(asctime)s %(name) -30s %(funcName) '
10              '-35s %(lineno) -5d: %(message)s')
11LOGGER = logging.getLogger(__name__)
12
13
14class ExamplePublisher(object):
15    """This is an example publisher that will handle unexpected interactions
16    with RabbitMQ such as channel and connection closures.
17
18    If RabbitMQ closes the connection, it will reopen it. You should
19    look at the output, as there are limited reasons why the connection may
20    be closed, which usually are tied to permission related issues or
21    socket timeouts.
22
23    It uses delivery confirmations and illustrates one way to keep track of
24    messages that have been sent and if they've been confirmed by RabbitMQ.
25
26    """
27    EXCHANGE = 'message'
28    EXCHANGE_TYPE = 'topic'
29    PUBLISH_INTERVAL = 1
30    QUEUE = 'text'
31    ROUTING_KEY = 'example.text'
32
33    def __init__(self, amqp_url):
34        """Setup the example publisher object, passing in the URL we will use
35        to connect to RabbitMQ.
36
37        :param str amqp_url: The URL for connecting to RabbitMQ
38
39        """
40        self._connection = None
41        self._channel = None
42
43        self._deliveries = None
44        self._acked = None
45        self._nacked = None
46        self._message_number = None
47
48        self._stopping = False
49        self._url = amqp_url
50
51    def connect(self):
52        """This method connects to RabbitMQ, returning the connection handle.
53        When the connection is established, the on_connection_open method
54        will be invoked by pika.
55
56        :rtype: pika.SelectConnection
57
58        """
59        LOGGER.info('Connecting to %s', self._url)
60        return pika.SelectConnection(
61            pika.URLParameters(self._url),
62            on_open_callback=self.on_connection_open,
63            on_open_error_callback=self.on_connection_open_error,
64            on_close_callback=self.on_connection_closed)
65
66    def on_connection_open(self, _unused_connection):
67        """This method is called by pika once the connection to RabbitMQ has
68        been established. It passes the handle to the connection object in
69        case we need it, but in this case, we'll just mark it unused.
70
71        :param pika.SelectConnection _unused_connection: The connection
72
73        """
74        LOGGER.info('Connection opened')
75        self.open_channel()
76
77    def on_connection_open_error(self, _unused_connection, err):
78        """This method is called by pika if the connection to RabbitMQ
79        can't be established.
80
81        :param pika.SelectConnection _unused_connection: The connection
82        :param Exception err: The error
83
84        """
85        LOGGER.error('Connection open failed, reopening in 5 seconds: %s', err)
86        self._connection.ioloop.call_later(5, self._connection.ioloop.stop)
87
88    def on_connection_closed(self, _unused_connection, reason):
89        """This method is invoked by pika when the connection to RabbitMQ is
90        closed unexpectedly. Since it is unexpected, we will reconnect to
91        RabbitMQ if it disconnects.
92
93        :param pika.connection.Connection connection: The closed connection obj
94        :param Exception reason: exception representing reason for loss of
95            connection.
96
97        """
98        self._channel = None
99        if self._stopping:
100            self._connection.ioloop.stop()
101        else:
102            LOGGER.warning('Connection closed, reopening in 5 seconds: %s',
103                           reason)
104            self._connection.ioloop.call_later(5, self._connection.ioloop.stop)
105
106    def open_channel(self):
107        """This method will open a new channel with RabbitMQ by issuing the
108        Channel.Open RPC command. When RabbitMQ confirms the channel is open
109        by sending the Channel.OpenOK RPC reply, the on_channel_open method
110        will be invoked.
111
112        """
113        LOGGER.info('Creating a new channel')
114        self._connection.channel(on_open_callback=self.on_channel_open)
115
116    def on_channel_open(self, channel):
117        """This method is invoked by pika when the channel has been opened.
118        The channel object is passed in so we can make use of it.
119
120        Since the channel is now open, we'll declare the exchange to use.
121
122        :param pika.channel.Channel channel: The channel object
123
124        """
125        LOGGER.info('Channel opened')
126        self._channel = channel
127        self.add_on_channel_close_callback()
128        self.setup_exchange(self.EXCHANGE)
129
130    def add_on_channel_close_callback(self):
131        """This method tells pika to call the on_channel_closed method if
132        RabbitMQ unexpectedly closes the channel.
133
134        """
135        LOGGER.info('Adding channel close callback')
136        self._channel.add_on_close_callback(self.on_channel_closed)
137
138    def on_channel_closed(self, channel, reason):
139        """Invoked by pika when RabbitMQ unexpectedly closes the channel.
140        Channels are usually closed if you attempt to do something that
141        violates the protocol, such as re-declare an exchange or queue with
142        different parameters. In this case, we'll close the connection
143        to shutdown the object.
144
145        :param pika.channel.Channel channel: The closed channel
146        :param Exception reason: why the channel was closed
147
148        """
149        LOGGER.warning('Channel %i was closed: %s', channel, reason)
150        self._channel = None
151        if not self._stopping:
152            self._connection.close()
153
154    def setup_exchange(self, exchange_name):
155        """Setup the exchange on RabbitMQ by invoking the Exchange.Declare RPC
156        command. When it is complete, the on_exchange_declareok method will
157        be invoked by pika.
158
159        :param str|unicode exchange_name: The name of the exchange to declare
160
161        """
162        LOGGER.info('Declaring exchange %s', exchange_name)
163        # Note: using functools.partial is not required, it is demonstrating
164        # how arbitrary data can be passed to the callback when it is called
165        cb = functools.partial(
166            self.on_exchange_declareok, userdata=exchange_name)
167        self._channel.exchange_declare(
168            exchange=exchange_name,
169            exchange_type=self.EXCHANGE_TYPE,
170            callback=cb)
171
172    def on_exchange_declareok(self, _unused_frame, userdata):
173        """Invoked by pika when RabbitMQ has finished the Exchange.Declare RPC
174        command.
175
176        :param pika.Frame.Method unused_frame: Exchange.DeclareOk response frame
177        :param str|unicode userdata: Extra user data (exchange name)
178
179        """
180        LOGGER.info('Exchange declared: %s', userdata)
181        self.setup_queue(self.QUEUE)
182
183    def setup_queue(self, queue_name):
184        """Setup the queue on RabbitMQ by invoking the Queue.Declare RPC
185        command. When it is complete, the on_queue_declareok method will
186        be invoked by pika.
187
188        :param str|unicode queue_name: The name of the queue to declare.
189
190        """
191        LOGGER.info('Declaring queue %s', queue_name)
192        self._channel.queue_declare(
193            queue=queue_name, callback=self.on_queue_declareok)
194
195    def on_queue_declareok(self, _unused_frame):
196        """Method invoked by pika when the Queue.Declare RPC call made in
197        setup_queue has completed. In this method we will bind the queue
198        and exchange together with the routing key by issuing the Queue.Bind
199        RPC command. When this command is complete, the on_bindok method will
200        be invoked by pika.
201
202        :param pika.frame.Method method_frame: The Queue.DeclareOk frame
203
204        """
205        LOGGER.info('Binding %s to %s with %s', self.EXCHANGE, self.QUEUE,
206                    self.ROUTING_KEY)
207        self._channel.queue_bind(
208            self.QUEUE,
209            self.EXCHANGE,
210            routing_key=self.ROUTING_KEY,
211            callback=self.on_bindok)
212
213    def on_bindok(self, _unused_frame):
214        """This method is invoked by pika when it receives the Queue.BindOk
215        response from RabbitMQ. Since we know we're now setup and bound, it's
216        time to start publishing."""
217        LOGGER.info('Queue bound')
218        self.start_publishing()
219
220    def start_publishing(self):
221        """This method will enable delivery confirmations and schedule the
222        first message to be sent to RabbitMQ
223
224        """
225        LOGGER.info('Issuing consumer related RPC commands')
226        self.enable_delivery_confirmations()
227        self.schedule_next_message()
228
229    def enable_delivery_confirmations(self):
230        """Send the Confirm.Select RPC method to RabbitMQ to enable delivery
231        confirmations on the channel. The only way to turn this off is to close
232        the channel and create a new one.
233
234        When the message is confirmed from RabbitMQ, the
235        on_delivery_confirmation method will be invoked passing in a Basic.Ack
236        or Basic.Nack method from RabbitMQ that will indicate which messages it
237        is confirming or rejecting.
238
239        """
240        LOGGER.info('Issuing Confirm.Select RPC command')
241        self._channel.confirm_delivery(self.on_delivery_confirmation)
242
243    def on_delivery_confirmation(self, method_frame):
244        """Invoked by pika when RabbitMQ responds to a Basic.Publish RPC
245        command, passing in either a Basic.Ack or Basic.Nack frame with
246        the delivery tag of the message that was published. The delivery tag
247        is an integer counter indicating the message number that was sent
248        on the channel via Basic.Publish. Here we're just doing house keeping
249        to keep track of stats and remove message numbers that we expect
250        a delivery confirmation of from the list used to keep track of messages
251        that are pending confirmation.
252
253        :param pika.frame.Method method_frame: Basic.Ack or Basic.Nack frame
254
255        """
256        confirmation_type = method_frame.method.NAME.split('.')[1].lower()
257        LOGGER.info('Received %s for delivery tag: %i', confirmation_type,
258                    method_frame.method.delivery_tag)
259        if confirmation_type == 'ack':
260            self._acked += 1
261        elif confirmation_type == 'nack':
262            self._nacked += 1
263        self._deliveries.remove(method_frame.method.delivery_tag)
264        LOGGER.info(
265            'Published %i messages, %i have yet to be confirmed, '
266            '%i were acked and %i were nacked', self._message_number,
267            len(self._deliveries), self._acked, self._nacked)
268
269    def schedule_next_message(self):
270        """If we are not closing our connection to RabbitMQ, schedule another
271        message to be delivered in PUBLISH_INTERVAL seconds.
272
273        """
274        LOGGER.info('Scheduling next message for %0.1f seconds',
275                    self.PUBLISH_INTERVAL)
276        self._connection.ioloop.call_later(self.PUBLISH_INTERVAL,
277                                           self.publish_message)
278
279    def publish_message(self):
280        """If the class is not stopping, publish a message to RabbitMQ,
281        appending a list of deliveries with the message number that was sent.
282        This list will be used to check for delivery confirmations in the
283        on_delivery_confirmations method.
284
285        Once the message has been sent, schedule another message to be sent.
286        The main reason I put scheduling in was just so you can get a good idea
287        of how the process is flowing by slowing down and speeding up the
288        delivery intervals by changing the PUBLISH_INTERVAL constant in the
289        class.
290
291        """
292        if self._channel is None or not self._channel.is_open:
293            return
294
295        hdrs = {u'مفتاح': u' قيمة', u'键': u'值', u'キー': u'値'}
296        properties = pika.BasicProperties(
297            app_id='example-publisher',
298            content_type='application/json',
299            headers=hdrs)
300
301        message = u'مفتاح قيمة 键 值 キー 値'
302        self._channel.basic_publish(self.EXCHANGE, self.ROUTING_KEY,
303                                    json.dumps(message, ensure_ascii=False),
304                                    properties)
305        self._message_number += 1
306        self._deliveries.append(self._message_number)
307        LOGGER.info('Published message # %i', self._message_number)
308        self.schedule_next_message()
309
310    def run(self):
311        """Run the example code by connecting and then starting the IOLoop.
312
313        """
314        while not self._stopping:
315            self._connection = None
316            self._deliveries = []
317            self._acked = 0
318            self._nacked = 0
319            self._message_number = 0
320
321            try:
322                self._connection = self.connect()
323                self._connection.ioloop.start()
324            except KeyboardInterrupt:
325                self.stop()
326                if (self._connection is not None and
327                        not self._connection.is_closed):
328                    # Finish closing
329                    self._connection.ioloop.start()
330
331        LOGGER.info('Stopped')
332
333    def stop(self):
334        """Stop the example by closing the channel and connection. We
335        set a flag here so that we stop scheduling new messages to be
336        published. The IOLoop is started because this method is
337        invoked by the Try/Catch below when KeyboardInterrupt is caught.
338        Starting the IOLoop again will allow the publisher to cleanly
339        disconnect from RabbitMQ.
340
341        """
342        LOGGER.info('Stopping')
343        self._stopping = True
344        self.close_channel()
345        self.close_connection()
346
347    def close_channel(self):
348        """Invoke this command to close the channel with RabbitMQ by sending
349        the Channel.Close RPC command.
350
351        """
352        if self._channel is not None:
353            LOGGER.info('Closing the channel')
354            self._channel.close()
355
356    def close_connection(self):
357        """This method closes the connection to RabbitMQ."""
358        if self._connection is not None:
359            LOGGER.info('Closing connection')
360            self._connection.close()
361
362
363def main():
364    logging.basicConfig(level=logging.DEBUG, format=LOG_FORMAT)
365
366    # Connect to localhost:5672 as guest with the password guest and virtual host "/" (%2F)
367    example = ExamplePublisher(
368        'amqp://guest:guest@localhost:5672/%2F?connection_attempts=3&heartbeat=3600'
369    )
370    example.run()
371
372
373if __name__ == '__main__':
374    main()
375