1#!/usr/bin/env python 2""" 3Test AMQP library. 4 5Repeatedly receive messages from the demo_send.py 6script, until it receives a message with 'quit' as the body. 7 82007-11-11 Barry Pederson <bp@barryp.org> 9 10""" 11from optparse import OptionParser 12 13import amqplib.client_0_8 as amqp 14 15 16def callback(msg): 17 for key, val in msg.properties.items(): 18 print ('%s: %s' % (key, str(val))) 19 for key, val in msg.delivery_info.items(): 20 print ('> %s: %s' % (key, str(val))) 21 22 print ('') 23 print (msg.body) 24 print ('-------') 25 msg.channel.basic_ack(msg.delivery_tag) 26 27 # 28 # Cancel this callback 29 # 30 if msg.body == 'quit': 31 msg.channel.basic_cancel(msg.consumer_tag) 32 33 34def main(): 35 parser = OptionParser() 36 parser.add_option('--host', dest='host', 37 help='AMQP server to connect to (default: %default)', 38 default='localhost') 39 parser.add_option('-u', '--userid', dest='userid', 40 help='userid to authenticate as (default: %default)', 41 default='guest') 42 parser.add_option('-p', '--password', dest='password', 43 help='password to authenticate with (default: %default)', 44 default='guest') 45 parser.add_option('--ssl', dest='ssl', action='store_true', 46 help='Enable SSL (default: not enabled)', 47 default=False) 48 49 options, args = parser.parse_args() 50 51 conn = amqp.Connection(options.host, userid=options.userid, password=options.password, ssl=options.ssl) 52 53 ch = conn.channel() 54 ch.access_request('/data', active=True, read=True) 55 56 ch.exchange_declare('myfan', 'fanout', auto_delete=True) 57 qname, _, _ = ch.queue_declare() 58 ch.queue_bind(qname, 'myfan') 59 ch.basic_consume(qname, callback=callback) 60 61 # 62 # Loop as long as the channel has callbacks registered 63 # 64 while ch.callbacks: 65 ch.wait() 66 67 ch.close() 68 conn.close() 69 70if __name__ == '__main__': 71 main() 72