1# -*- coding: utf-8 -*- 2# pylint: disable=C0111,C0103,R0205 3 4import functools 5import logging 6import time 7import pika 8 9LOG_FORMAT = ('%(levelname) -10s %(asctime)s %(name) -30s %(funcName) ' 10 '-35s %(lineno) -5d: %(message)s') 11LOGGER = logging.getLogger(__name__) 12 13 14class ExampleConsumer(object): 15 """This is an example consumer that will handle unexpected interactions 16 with RabbitMQ such as channel and connection closures. 17 18 If RabbitMQ closes the connection, this class will stop and indicate 19 that reconnection is necessary. You should look at the output, as 20 there are limited reasons why the connection may be closed, which 21 usually are tied to permission related issues or socket timeouts. 22 23 If the channel is closed, it will indicate a problem with one of the 24 commands that were issued and that should surface in the output as well. 25 26 """ 27 EXCHANGE = 'message' 28 EXCHANGE_TYPE = 'topic' 29 QUEUE = 'text' 30 ROUTING_KEY = 'example.text' 31 32 def __init__(self, amqp_url): 33 """Create a new instance of the consumer class, passing in the AMQP 34 URL used to connect to RabbitMQ. 35 36 :param str amqp_url: The AMQP url to connect with 37 38 """ 39 self.should_reconnect = False 40 self.was_consuming = False 41 42 self._connection = None 43 self._channel = None 44 self._closing = False 45 self._consumer_tag = None 46 self._url = amqp_url 47 self._consuming = False 48 # In production, experiment with higher prefetch values 49 # for higher consumer throughput 50 self._prefetch_count = 1 51 52 def connect(self): 53 """This method connects to RabbitMQ, returning the connection handle. 54 When the connection is established, the on_connection_open method 55 will be invoked by pika. 56 57 :rtype: pika.SelectConnection 58 59 """ 60 LOGGER.info('Connecting to %s', self._url) 61 return pika.SelectConnection( 62 parameters=pika.URLParameters(self._url), 63 on_open_callback=self.on_connection_open, 64 on_open_error_callback=self.on_connection_open_error, 65 on_close_callback=self.on_connection_closed) 66 67 def close_connection(self): 68 self._consuming = False 69 if self._connection.is_closing or self._connection.is_closed: 70 LOGGER.info('Connection is closing or already closed') 71 else: 72 LOGGER.info('Closing connection') 73 self._connection.close() 74 75 def on_connection_open(self, _unused_connection): 76 """This method is called by pika once the connection to RabbitMQ has 77 been established. It passes the handle to the connection object in 78 case we need it, but in this case, we'll just mark it unused. 79 80 :param pika.SelectConnection _unused_connection: The connection 81 82 """ 83 LOGGER.info('Connection opened') 84 self.open_channel() 85 86 def on_connection_open_error(self, _unused_connection, err): 87 """This method is called by pika if the connection to RabbitMQ 88 can't be established. 89 90 :param pika.SelectConnection _unused_connection: The connection 91 :param Exception err: The error 92 93 """ 94 LOGGER.error('Connection open failed: %s', err) 95 self.reconnect() 96 97 def on_connection_closed(self, _unused_connection, reason): 98 """This method is invoked by pika when the connection to RabbitMQ is 99 closed unexpectedly. Since it is unexpected, we will reconnect to 100 RabbitMQ if it disconnects. 101 102 :param pika.connection.Connection connection: The closed connection obj 103 :param Exception reason: exception representing reason for loss of 104 connection. 105 106 """ 107 self._channel = None 108 if self._closing: 109 self._connection.ioloop.stop() 110 else: 111 LOGGER.warning('Connection closed, reconnect necessary: %s', reason) 112 self.reconnect() 113 114 def reconnect(self): 115 """Will be invoked if the connection can't be opened or is 116 closed. Indicates that a reconnect is necessary then stops the 117 ioloop. 118 119 """ 120 self.should_reconnect = True 121 self.stop() 122 123 def open_channel(self): 124 """Open a new channel with RabbitMQ by issuing the Channel.Open RPC 125 command. When RabbitMQ responds that the channel is open, the 126 on_channel_open callback will be invoked by pika. 127 128 """ 129 LOGGER.info('Creating a new channel') 130 self._connection.channel(on_open_callback=self.on_channel_open) 131 132 def on_channel_open(self, channel): 133 """This method is invoked by pika when the channel has been opened. 134 The channel object is passed in so we can make use of it. 135 136 Since the channel is now open, we'll declare the exchange to use. 137 138 :param pika.channel.Channel channel: The channel object 139 140 """ 141 LOGGER.info('Channel opened') 142 self._channel = channel 143 self.add_on_channel_close_callback() 144 self.setup_exchange(self.EXCHANGE) 145 146 def add_on_channel_close_callback(self): 147 """This method tells pika to call the on_channel_closed method if 148 RabbitMQ unexpectedly closes the channel. 149 150 """ 151 LOGGER.info('Adding channel close callback') 152 self._channel.add_on_close_callback(self.on_channel_closed) 153 154 def on_channel_closed(self, channel, reason): 155 """Invoked by pika when RabbitMQ unexpectedly closes the channel. 156 Channels are usually closed if you attempt to do something that 157 violates the protocol, such as re-declare an exchange or queue with 158 different parameters. In this case, we'll close the connection 159 to shutdown the object. 160 161 :param pika.channel.Channel: The closed channel 162 :param Exception reason: why the channel was closed 163 164 """ 165 LOGGER.warning('Channel %i was closed: %s', channel, reason) 166 self.close_connection() 167 168 def setup_exchange(self, exchange_name): 169 """Setup the exchange on RabbitMQ by invoking the Exchange.Declare RPC 170 command. When it is complete, the on_exchange_declareok method will 171 be invoked by pika. 172 173 :param str|unicode exchange_name: The name of the exchange to declare 174 175 """ 176 LOGGER.info('Declaring exchange: %s', exchange_name) 177 # Note: using functools.partial is not required, it is demonstrating 178 # how arbitrary data can be passed to the callback when it is called 179 cb = functools.partial( 180 self.on_exchange_declareok, userdata=exchange_name) 181 self._channel.exchange_declare( 182 exchange=exchange_name, 183 exchange_type=self.EXCHANGE_TYPE, 184 callback=cb) 185 186 def on_exchange_declareok(self, _unused_frame, userdata): 187 """Invoked by pika when RabbitMQ has finished the Exchange.Declare RPC 188 command. 189 190 :param pika.Frame.Method unused_frame: Exchange.DeclareOk response frame 191 :param str|unicode userdata: Extra user data (exchange name) 192 193 """ 194 LOGGER.info('Exchange declared: %s', userdata) 195 self.setup_queue(self.QUEUE) 196 197 def setup_queue(self, queue_name): 198 """Setup the queue on RabbitMQ by invoking the Queue.Declare RPC 199 command. When it is complete, the on_queue_declareok method will 200 be invoked by pika. 201 202 :param str|unicode queue_name: The name of the queue to declare. 203 204 """ 205 LOGGER.info('Declaring queue %s', queue_name) 206 cb = functools.partial(self.on_queue_declareok, userdata=queue_name) 207 self._channel.queue_declare(queue=queue_name, callback=cb) 208 209 def on_queue_declareok(self, _unused_frame, userdata): 210 """Method invoked by pika when the Queue.Declare RPC call made in 211 setup_queue has completed. In this method we will bind the queue 212 and exchange together with the routing key by issuing the Queue.Bind 213 RPC command. When this command is complete, the on_bindok method will 214 be invoked by pika. 215 216 :param pika.frame.Method _unused_frame: The Queue.DeclareOk frame 217 :param str|unicode userdata: Extra user data (queue name) 218 219 """ 220 queue_name = userdata 221 LOGGER.info('Binding %s to %s with %s', self.EXCHANGE, queue_name, 222 self.ROUTING_KEY) 223 cb = functools.partial(self.on_bindok, userdata=queue_name) 224 self._channel.queue_bind( 225 queue_name, 226 self.EXCHANGE, 227 routing_key=self.ROUTING_KEY, 228 callback=cb) 229 230 def on_bindok(self, _unused_frame, userdata): 231 """Invoked by pika when the Queue.Bind method has completed. At this 232 point we will set the prefetch count for the channel. 233 234 :param pika.frame.Method _unused_frame: The Queue.BindOk response frame 235 :param str|unicode userdata: Extra user data (queue name) 236 237 """ 238 LOGGER.info('Queue bound: %s', userdata) 239 self.set_qos() 240 241 def set_qos(self): 242 """This method sets up the consumer prefetch to only be delivered 243 one message at a time. The consumer must acknowledge this message 244 before RabbitMQ will deliver another one. You should experiment 245 with different prefetch values to achieve desired performance. 246 247 """ 248 self._channel.basic_qos( 249 prefetch_count=self._prefetch_count, callback=self.on_basic_qos_ok) 250 251 def on_basic_qos_ok(self, _unused_frame): 252 """Invoked by pika when the Basic.QoS method has completed. At this 253 point we will start consuming messages by calling start_consuming 254 which will invoke the needed RPC commands to start the process. 255 256 :param pika.frame.Method _unused_frame: The Basic.QosOk response frame 257 258 """ 259 LOGGER.info('QOS set to: %d', self._prefetch_count) 260 self.start_consuming() 261 262 def start_consuming(self): 263 """This method sets up the consumer by first calling 264 add_on_cancel_callback so that the object is notified if RabbitMQ 265 cancels the consumer. It then issues the Basic.Consume RPC command 266 which returns the consumer tag that is used to uniquely identify the 267 consumer with RabbitMQ. We keep the value to use it when we want to 268 cancel consuming. The on_message method is passed in as a callback pika 269 will invoke when a message is fully received. 270 271 """ 272 LOGGER.info('Issuing consumer related RPC commands') 273 self.add_on_cancel_callback() 274 self._consumer_tag = self._channel.basic_consume( 275 self.QUEUE, self.on_message) 276 self.was_consuming = True 277 self._consuming = True 278 279 def add_on_cancel_callback(self): 280 """Add a callback that will be invoked if RabbitMQ cancels the consumer 281 for some reason. If RabbitMQ does cancel the consumer, 282 on_consumer_cancelled will be invoked by pika. 283 284 """ 285 LOGGER.info('Adding consumer cancellation callback') 286 self._channel.add_on_cancel_callback(self.on_consumer_cancelled) 287 288 def on_consumer_cancelled(self, method_frame): 289 """Invoked by pika when RabbitMQ sends a Basic.Cancel for a consumer 290 receiving messages. 291 292 :param pika.frame.Method method_frame: The Basic.Cancel frame 293 294 """ 295 LOGGER.info('Consumer was cancelled remotely, shutting down: %r', 296 method_frame) 297 if self._channel: 298 self._channel.close() 299 300 def on_message(self, _unused_channel, basic_deliver, properties, body): 301 """Invoked by pika when a message is delivered from RabbitMQ. The 302 channel is passed for your convenience. The basic_deliver object that 303 is passed in carries the exchange, routing key, delivery tag and 304 a redelivered flag for the message. The properties passed in is an 305 instance of BasicProperties with the message properties and the body 306 is the message that was sent. 307 308 :param pika.channel.Channel _unused_channel: The channel object 309 :param pika.Spec.Basic.Deliver: basic_deliver method 310 :param pika.Spec.BasicProperties: properties 311 :param bytes body: The message body 312 313 """ 314 LOGGER.info('Received message # %s from %s: %s', 315 basic_deliver.delivery_tag, properties.app_id, body) 316 self.acknowledge_message(basic_deliver.delivery_tag) 317 318 def acknowledge_message(self, delivery_tag): 319 """Acknowledge the message delivery from RabbitMQ by sending a 320 Basic.Ack RPC method for the delivery tag. 321 322 :param int delivery_tag: The delivery tag from the Basic.Deliver frame 323 324 """ 325 LOGGER.info('Acknowledging message %s', delivery_tag) 326 self._channel.basic_ack(delivery_tag) 327 328 def stop_consuming(self): 329 """Tell RabbitMQ that you would like to stop consuming by sending the 330 Basic.Cancel RPC command. 331 332 """ 333 if self._channel: 334 LOGGER.info('Sending a Basic.Cancel RPC command to RabbitMQ') 335 cb = functools.partial( 336 self.on_cancelok, userdata=self._consumer_tag) 337 self._channel.basic_cancel(self._consumer_tag, cb) 338 339 def on_cancelok(self, _unused_frame, userdata): 340 """This method is invoked by pika when RabbitMQ acknowledges the 341 cancellation of a consumer. At this point we will close the channel. 342 This will invoke the on_channel_closed method once the channel has been 343 closed, which will in-turn close the connection. 344 345 :param pika.frame.Method _unused_frame: The Basic.CancelOk frame 346 :param str|unicode userdata: Extra user data (consumer tag) 347 348 """ 349 self._consuming = False 350 LOGGER.info( 351 'RabbitMQ acknowledged the cancellation of the consumer: %s', 352 userdata) 353 self.close_channel() 354 355 def close_channel(self): 356 """Call to close the channel with RabbitMQ cleanly by issuing the 357 Channel.Close RPC command. 358 359 """ 360 LOGGER.info('Closing the channel') 361 self._channel.close() 362 363 def run(self): 364 """Run the example consumer by connecting to RabbitMQ and then 365 starting the IOLoop to block and allow the SelectConnection to operate. 366 367 """ 368 self._connection = self.connect() 369 self._connection.ioloop.start() 370 371 def stop(self): 372 """Cleanly shutdown the connection to RabbitMQ by stopping the consumer 373 with RabbitMQ. When RabbitMQ confirms the cancellation, on_cancelok 374 will be invoked by pika, which will then closing the channel and 375 connection. The IOLoop is started again because this method is invoked 376 when CTRL-C is pressed raising a KeyboardInterrupt exception. This 377 exception stops the IOLoop which needs to be running for pika to 378 communicate with RabbitMQ. All of the commands issued prior to starting 379 the IOLoop will be buffered but not processed. 380 381 """ 382 if not self._closing: 383 self._closing = True 384 LOGGER.info('Stopping') 385 if self._consuming: 386 self.stop_consuming() 387 self._connection.ioloop.start() 388 else: 389 self._connection.ioloop.stop() 390 LOGGER.info('Stopped') 391 392 393class ReconnectingExampleConsumer(object): 394 """This is an example consumer that will reconnect if the nested 395 ExampleConsumer indicates that a reconnect is necessary. 396 397 """ 398 399 def __init__(self, amqp_url): 400 self._reconnect_delay = 0 401 self._amqp_url = amqp_url 402 self._consumer = ExampleConsumer(self._amqp_url) 403 404 def run(self): 405 while True: 406 try: 407 self._consumer.run() 408 except KeyboardInterrupt: 409 self._consumer.stop() 410 break 411 self._maybe_reconnect() 412 413 def _maybe_reconnect(self): 414 if self._consumer.should_reconnect: 415 self._consumer.stop() 416 reconnect_delay = self._get_reconnect_delay() 417 LOGGER.info('Reconnecting after %d seconds', reconnect_delay) 418 time.sleep(reconnect_delay) 419 self._consumer = ExampleConsumer(self._amqp_url) 420 421 def _get_reconnect_delay(self): 422 if self._consumer.was_consuming: 423 self._reconnect_delay = 0 424 else: 425 self._reconnect_delay += 1 426 if self._reconnect_delay > 30: 427 self._reconnect_delay = 30 428 return self._reconnect_delay 429 430 431def main(): 432 logging.basicConfig(level=logging.DEBUG, format=LOG_FORMAT) 433 amqp_url = 'amqp://guest:guest@localhost:5672/%2F' 434 consumer = ReconnectingExampleConsumer(amqp_url) 435 consumer.run() 436 437 438if __name__ == '__main__': 439 main() 440