1# -*- coding: utf-8 -*- 2# pylint: disable=C0111,C0103,R0205 3 4import functools 5import random 6import pika 7 8 9def on_message(ch, method_frame, _header_frame, body, userdata=None): 10 print('Userdata: {} Message body: {}'.format(userdata, body)) 11 ch.basic_ack(delivery_tag=method_frame.delivery_tag) 12 13 14credentials = pika.PlainCredentials('guest', 'guest') 15 16params1 = pika.ConnectionParameters( 17 'localhost', port=5672, credentials=credentials) 18params2 = pika.ConnectionParameters( 19 'localhost', port=5673, credentials=credentials) 20params3 = pika.ConnectionParameters( 21 'localhost', port=5674, credentials=credentials) 22params_all = [params1, params2, params3] 23 24# Infinite loop 25while True: 26 try: 27 random.shuffle(params_all) 28 connection = pika.BlockingConnection(params_all) 29 channel = connection.channel() 30 channel.exchange_declare( 31 exchange='test_exchange', 32 exchange_type='direct', 33 passive=False, 34 durable=True, 35 auto_delete=False) 36 channel.queue_declare(queue='standard', auto_delete=True) 37 channel.queue_bind( 38 queue='standard', 39 exchange='test_exchange', 40 routing_key='standard_key') 41 channel.basic_qos(prefetch_count=1) 42 43 on_message_callback = functools.partial( 44 on_message, userdata='on_message_userdata') 45 channel.basic_consume('standard', on_message_callback) 46 47 try: 48 channel.start_consuming() 49 except KeyboardInterrupt: 50 channel.stop_consuming() 51 52 connection.close() 53 break 54 # Do not recover if connection was closed by broker 55 except pika.exceptions.ConnectionClosedByBroker: 56 break 57 # Do not recover on channel errors 58 except pika.exceptions.AMQPChannelError: 59 break 60 # Recover on all other connection errors 61 except pika.exceptions.AMQPConnectionError: 62 continue 63