1# -*- coding: utf-8 -*- 2# pylint: disable=C0111,C0103,R0205 3 4import functools 5import logging 6import json 7import pika 8 9LOG_FORMAT = ('%(levelname) -10s %(asctime)s %(name) -30s %(funcName) ' 10 '-35s %(lineno) -5d: %(message)s') 11LOGGER = logging.getLogger(__name__) 12 13 14class ExamplePublisher(object): 15 """This is an example publisher that will handle unexpected interactions 16 with RabbitMQ such as channel and connection closures. 17 18 If RabbitMQ closes the connection, it will reopen it. You should 19 look at the output, as there are limited reasons why the connection may 20 be closed, which usually are tied to permission related issues or 21 socket timeouts. 22 23 It uses delivery confirmations and illustrates one way to keep track of 24 messages that have been sent and if they've been confirmed by RabbitMQ. 25 26 """ 27 EXCHANGE = 'message' 28 EXCHANGE_TYPE = 'topic' 29 PUBLISH_INTERVAL = 1 30 QUEUE = 'text' 31 ROUTING_KEY = 'example.text' 32 33 def __init__(self, amqp_url): 34 """Setup the example publisher object, passing in the URL we will use 35 to connect to RabbitMQ. 36 37 :param str amqp_url: The URL for connecting to RabbitMQ 38 39 """ 40 self._connection = None 41 self._channel = None 42 43 self._deliveries = None 44 self._acked = None 45 self._nacked = None 46 self._message_number = None 47 48 self._stopping = False 49 self._url = amqp_url 50 51 def connect(self): 52 """This method connects to RabbitMQ, returning the connection handle. 53 When the connection is established, the on_connection_open method 54 will be invoked by pika. 55 56 :rtype: pika.SelectConnection 57 58 """ 59 LOGGER.info('Connecting to %s', self._url) 60 return pika.SelectConnection( 61 pika.URLParameters(self._url), 62 on_open_callback=self.on_connection_open, 63 on_open_error_callback=self.on_connection_open_error, 64 on_close_callback=self.on_connection_closed) 65 66 def on_connection_open(self, _unused_connection): 67 """This method is called by pika once the connection to RabbitMQ has 68 been established. It passes the handle to the connection object in 69 case we need it, but in this case, we'll just mark it unused. 70 71 :param pika.SelectConnection _unused_connection: The connection 72 73 """ 74 LOGGER.info('Connection opened') 75 self.open_channel() 76 77 def on_connection_open_error(self, _unused_connection, err): 78 """This method is called by pika if the connection to RabbitMQ 79 can't be established. 80 81 :param pika.SelectConnection _unused_connection: The connection 82 :param Exception err: The error 83 84 """ 85 LOGGER.error('Connection open failed, reopening in 5 seconds: %s', err) 86 self._connection.ioloop.call_later(5, self._connection.ioloop.stop) 87 88 def on_connection_closed(self, _unused_connection, reason): 89 """This method is invoked by pika when the connection to RabbitMQ is 90 closed unexpectedly. Since it is unexpected, we will reconnect to 91 RabbitMQ if it disconnects. 92 93 :param pika.connection.Connection connection: The closed connection obj 94 :param Exception reason: exception representing reason for loss of 95 connection. 96 97 """ 98 self._channel = None 99 if self._stopping: 100 self._connection.ioloop.stop() 101 else: 102 LOGGER.warning('Connection closed, reopening in 5 seconds: %s', 103 reason) 104 self._connection.ioloop.call_later(5, self._connection.ioloop.stop) 105 106 def open_channel(self): 107 """This method will open a new channel with RabbitMQ by issuing the 108 Channel.Open RPC command. When RabbitMQ confirms the channel is open 109 by sending the Channel.OpenOK RPC reply, the on_channel_open method 110 will be invoked. 111 112 """ 113 LOGGER.info('Creating a new channel') 114 self._connection.channel(on_open_callback=self.on_channel_open) 115 116 def on_channel_open(self, channel): 117 """This method is invoked by pika when the channel has been opened. 118 The channel object is passed in so we can make use of it. 119 120 Since the channel is now open, we'll declare the exchange to use. 121 122 :param pika.channel.Channel channel: The channel object 123 124 """ 125 LOGGER.info('Channel opened') 126 self._channel = channel 127 self.add_on_channel_close_callback() 128 self.setup_exchange(self.EXCHANGE) 129 130 def add_on_channel_close_callback(self): 131 """This method tells pika to call the on_channel_closed method if 132 RabbitMQ unexpectedly closes the channel. 133 134 """ 135 LOGGER.info('Adding channel close callback') 136 self._channel.add_on_close_callback(self.on_channel_closed) 137 138 def on_channel_closed(self, channel, reason): 139 """Invoked by pika when RabbitMQ unexpectedly closes the channel. 140 Channels are usually closed if you attempt to do something that 141 violates the protocol, such as re-declare an exchange or queue with 142 different parameters. In this case, we'll close the connection 143 to shutdown the object. 144 145 :param pika.channel.Channel channel: The closed channel 146 :param Exception reason: why the channel was closed 147 148 """ 149 LOGGER.warning('Channel %i was closed: %s', channel, reason) 150 self._channel = None 151 if not self._stopping: 152 self._connection.close() 153 154 def setup_exchange(self, exchange_name): 155 """Setup the exchange on RabbitMQ by invoking the Exchange.Declare RPC 156 command. When it is complete, the on_exchange_declareok method will 157 be invoked by pika. 158 159 :param str|unicode exchange_name: The name of the exchange to declare 160 161 """ 162 LOGGER.info('Declaring exchange %s', exchange_name) 163 # Note: using functools.partial is not required, it is demonstrating 164 # how arbitrary data can be passed to the callback when it is called 165 cb = functools.partial( 166 self.on_exchange_declareok, userdata=exchange_name) 167 self._channel.exchange_declare( 168 exchange=exchange_name, 169 exchange_type=self.EXCHANGE_TYPE, 170 callback=cb) 171 172 def on_exchange_declareok(self, _unused_frame, userdata): 173 """Invoked by pika when RabbitMQ has finished the Exchange.Declare RPC 174 command. 175 176 :param pika.Frame.Method unused_frame: Exchange.DeclareOk response frame 177 :param str|unicode userdata: Extra user data (exchange name) 178 179 """ 180 LOGGER.info('Exchange declared: %s', userdata) 181 self.setup_queue(self.QUEUE) 182 183 def setup_queue(self, queue_name): 184 """Setup the queue on RabbitMQ by invoking the Queue.Declare RPC 185 command. When it is complete, the on_queue_declareok method will 186 be invoked by pika. 187 188 :param str|unicode queue_name: The name of the queue to declare. 189 190 """ 191 LOGGER.info('Declaring queue %s', queue_name) 192 self._channel.queue_declare( 193 queue=queue_name, callback=self.on_queue_declareok) 194 195 def on_queue_declareok(self, _unused_frame): 196 """Method invoked by pika when the Queue.Declare RPC call made in 197 setup_queue has completed. In this method we will bind the queue 198 and exchange together with the routing key by issuing the Queue.Bind 199 RPC command. When this command is complete, the on_bindok method will 200 be invoked by pika. 201 202 :param pika.frame.Method method_frame: The Queue.DeclareOk frame 203 204 """ 205 LOGGER.info('Binding %s to %s with %s', self.EXCHANGE, self.QUEUE, 206 self.ROUTING_KEY) 207 self._channel.queue_bind( 208 self.QUEUE, 209 self.EXCHANGE, 210 routing_key=self.ROUTING_KEY, 211 callback=self.on_bindok) 212 213 def on_bindok(self, _unused_frame): 214 """This method is invoked by pika when it receives the Queue.BindOk 215 response from RabbitMQ. Since we know we're now setup and bound, it's 216 time to start publishing.""" 217 LOGGER.info('Queue bound') 218 self.start_publishing() 219 220 def start_publishing(self): 221 """This method will enable delivery confirmations and schedule the 222 first message to be sent to RabbitMQ 223 224 """ 225 LOGGER.info('Issuing consumer related RPC commands') 226 self.enable_delivery_confirmations() 227 self.schedule_next_message() 228 229 def enable_delivery_confirmations(self): 230 """Send the Confirm.Select RPC method to RabbitMQ to enable delivery 231 confirmations on the channel. The only way to turn this off is to close 232 the channel and create a new one. 233 234 When the message is confirmed from RabbitMQ, the 235 on_delivery_confirmation method will be invoked passing in a Basic.Ack 236 or Basic.Nack method from RabbitMQ that will indicate which messages it 237 is confirming or rejecting. 238 239 """ 240 LOGGER.info('Issuing Confirm.Select RPC command') 241 self._channel.confirm_delivery(self.on_delivery_confirmation) 242 243 def on_delivery_confirmation(self, method_frame): 244 """Invoked by pika when RabbitMQ responds to a Basic.Publish RPC 245 command, passing in either a Basic.Ack or Basic.Nack frame with 246 the delivery tag of the message that was published. The delivery tag 247 is an integer counter indicating the message number that was sent 248 on the channel via Basic.Publish. Here we're just doing house keeping 249 to keep track of stats and remove message numbers that we expect 250 a delivery confirmation of from the list used to keep track of messages 251 that are pending confirmation. 252 253 :param pika.frame.Method method_frame: Basic.Ack or Basic.Nack frame 254 255 """ 256 confirmation_type = method_frame.method.NAME.split('.')[1].lower() 257 LOGGER.info('Received %s for delivery tag: %i', confirmation_type, 258 method_frame.method.delivery_tag) 259 if confirmation_type == 'ack': 260 self._acked += 1 261 elif confirmation_type == 'nack': 262 self._nacked += 1 263 self._deliveries.remove(method_frame.method.delivery_tag) 264 LOGGER.info( 265 'Published %i messages, %i have yet to be confirmed, ' 266 '%i were acked and %i were nacked', self._message_number, 267 len(self._deliveries), self._acked, self._nacked) 268 269 def schedule_next_message(self): 270 """If we are not closing our connection to RabbitMQ, schedule another 271 message to be delivered in PUBLISH_INTERVAL seconds. 272 273 """ 274 LOGGER.info('Scheduling next message for %0.1f seconds', 275 self.PUBLISH_INTERVAL) 276 self._connection.ioloop.call_later(self.PUBLISH_INTERVAL, 277 self.publish_message) 278 279 def publish_message(self): 280 """If the class is not stopping, publish a message to RabbitMQ, 281 appending a list of deliveries with the message number that was sent. 282 This list will be used to check for delivery confirmations in the 283 on_delivery_confirmations method. 284 285 Once the message has been sent, schedule another message to be sent. 286 The main reason I put scheduling in was just so you can get a good idea 287 of how the process is flowing by slowing down and speeding up the 288 delivery intervals by changing the PUBLISH_INTERVAL constant in the 289 class. 290 291 """ 292 if self._channel is None or not self._channel.is_open: 293 return 294 295 hdrs = {u'مفتاح': u' قيمة', u'键': u'值', u'キー': u'値'} 296 properties = pika.BasicProperties( 297 app_id='example-publisher', 298 content_type='application/json', 299 headers=hdrs) 300 301 message = u'مفتاح قيمة 键 值 キー 値' 302 self._channel.basic_publish(self.EXCHANGE, self.ROUTING_KEY, 303 json.dumps(message, ensure_ascii=False), 304 properties) 305 self._message_number += 1 306 self._deliveries.append(self._message_number) 307 LOGGER.info('Published message # %i', self._message_number) 308 self.schedule_next_message() 309 310 def run(self): 311 """Run the example code by connecting and then starting the IOLoop. 312 313 """ 314 while not self._stopping: 315 self._connection = None 316 self._deliveries = [] 317 self._acked = 0 318 self._nacked = 0 319 self._message_number = 0 320 321 try: 322 self._connection = self.connect() 323 self._connection.ioloop.start() 324 except KeyboardInterrupt: 325 self.stop() 326 if (self._connection is not None and 327 not self._connection.is_closed): 328 # Finish closing 329 self._connection.ioloop.start() 330 331 LOGGER.info('Stopped') 332 333 def stop(self): 334 """Stop the example by closing the channel and connection. We 335 set a flag here so that we stop scheduling new messages to be 336 published. The IOLoop is started because this method is 337 invoked by the Try/Catch below when KeyboardInterrupt is caught. 338 Starting the IOLoop again will allow the publisher to cleanly 339 disconnect from RabbitMQ. 340 341 """ 342 LOGGER.info('Stopping') 343 self._stopping = True 344 self.close_channel() 345 self.close_connection() 346 347 def close_channel(self): 348 """Invoke this command to close the channel with RabbitMQ by sending 349 the Channel.Close RPC command. 350 351 """ 352 if self._channel is not None: 353 LOGGER.info('Closing the channel') 354 self._channel.close() 355 356 def close_connection(self): 357 """This method closes the connection to RabbitMQ.""" 358 if self._connection is not None: 359 LOGGER.info('Closing connection') 360 self._connection.close() 361 362 363def main(): 364 logging.basicConfig(level=logging.DEBUG, format=LOG_FORMAT) 365 366 # Connect to localhost:5672 as guest with the password guest and virtual host "/" (%2F) 367 example = ExamplePublisher( 368 'amqp://guest:guest@localhost:5672/%2F?connection_attempts=3&heartbeat=3600' 369 ) 370 example.run() 371 372 373if __name__ == '__main__': 374 main() 375