1#!/usr/local/bin/python3.8 2 3import socket 4import uuid 5import paho.mqtt.client as mqtt 6import asyncio 7 8client_id = 'paho-mqtt-python/issue72/' + str(uuid.uuid4()) 9topic = client_id 10print("Using client_id / topic: " + client_id) 11 12 13class AsyncioHelper: 14 def __init__(self, loop, client): 15 self.loop = loop 16 self.client = client 17 self.client.on_socket_open = self.on_socket_open 18 self.client.on_socket_close = self.on_socket_close 19 self.client.on_socket_register_write = self.on_socket_register_write 20 self.client.on_socket_unregister_write = self.on_socket_unregister_write 21 22 def on_socket_open(self, client, userdata, sock): 23 print("Socket opened") 24 25 def cb(): 26 print("Socket is readable, calling loop_read") 27 client.loop_read() 28 29 self.loop.add_reader(sock, cb) 30 self.misc = self.loop.create_task(self.misc_loop()) 31 32 def on_socket_close(self, client, userdata, sock): 33 print("Socket closed") 34 self.loop.remove_reader(sock) 35 self.misc.cancel() 36 37 def on_socket_register_write(self, client, userdata, sock): 38 print("Watching socket for writability.") 39 40 def cb(): 41 print("Socket is writable, calling loop_write") 42 client.loop_write() 43 44 self.loop.add_writer(sock, cb) 45 46 def on_socket_unregister_write(self, client, userdata, sock): 47 print("Stop watching socket for writability.") 48 self.loop.remove_writer(sock) 49 50 async def misc_loop(self): 51 print("misc_loop started") 52 while self.client.loop_misc() == mqtt.MQTT_ERR_SUCCESS: 53 try: 54 await asyncio.sleep(1) 55 except asyncio.CancelledError: 56 break 57 print("misc_loop finished") 58 59 60class AsyncMqttExample: 61 def __init__(self, loop): 62 self.loop = loop 63 64 def on_connect(self, client, userdata, flags, rc): 65 print("Subscribing") 66 client.subscribe(topic) 67 68 def on_message(self, client, userdata, msg): 69 if not self.got_message: 70 print("Got unexpected message: {}".format(msg.decode())) 71 else: 72 self.got_message.set_result(msg.payload) 73 74 def on_disconnect(self, client, userdata, rc): 75 self.disconnected.set_result(rc) 76 77 async def main(self): 78 self.disconnected = self.loop.create_future() 79 self.got_message = None 80 81 self.client = mqtt.Client(client_id=client_id) 82 self.client.on_connect = self.on_connect 83 self.client.on_message = self.on_message 84 self.client.on_disconnect = self.on_disconnect 85 86 aioh = AsyncioHelper(self.loop, self.client) 87 88 self.client.connect('mqtt.eclipse.org', 1883, 60) 89 self.client.socket().setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, 2048) 90 91 for c in range(3): 92 await asyncio.sleep(5) 93 print("Publishing") 94 self.got_message = self.loop.create_future() 95 self.client.publish(topic, b'Hello' * 40000, qos=1) 96 msg = await self.got_message 97 print("Got response with {} bytes".format(len(msg))) 98 self.got_message = None 99 100 self.client.disconnect() 101 print("Disconnected: {}".format(await self.disconnected)) 102 103 104print("Starting") 105loop = asyncio.get_event_loop() 106loop.run_until_complete(AsyncMqttExample(loop).main()) 107loop.close() 108print("Finished") 109