1#!/usr/local/bin/python3.8
2
3import socket
4import uuid
5import paho.mqtt.client as mqtt
6import asyncio
7
8client_id = 'paho-mqtt-python/issue72/' + str(uuid.uuid4())
9topic = client_id
10print("Using client_id / topic: " + client_id)
11
12
13class AsyncioHelper:
14    def __init__(self, loop, client):
15        self.loop = loop
16        self.client = client
17        self.client.on_socket_open = self.on_socket_open
18        self.client.on_socket_close = self.on_socket_close
19        self.client.on_socket_register_write = self.on_socket_register_write
20        self.client.on_socket_unregister_write = self.on_socket_unregister_write
21
22    def on_socket_open(self, client, userdata, sock):
23        print("Socket opened")
24
25        def cb():
26            print("Socket is readable, calling loop_read")
27            client.loop_read()
28
29        self.loop.add_reader(sock, cb)
30        self.misc = self.loop.create_task(self.misc_loop())
31
32    def on_socket_close(self, client, userdata, sock):
33        print("Socket closed")
34        self.loop.remove_reader(sock)
35        self.misc.cancel()
36
37    def on_socket_register_write(self, client, userdata, sock):
38        print("Watching socket for writability.")
39
40        def cb():
41            print("Socket is writable, calling loop_write")
42            client.loop_write()
43
44        self.loop.add_writer(sock, cb)
45
46    def on_socket_unregister_write(self, client, userdata, sock):
47        print("Stop watching socket for writability.")
48        self.loop.remove_writer(sock)
49
50    async def misc_loop(self):
51        print("misc_loop started")
52        while self.client.loop_misc() == mqtt.MQTT_ERR_SUCCESS:
53            try:
54                await asyncio.sleep(1)
55            except asyncio.CancelledError:
56                break
57        print("misc_loop finished")
58
59
60class AsyncMqttExample:
61    def __init__(self, loop):
62        self.loop = loop
63
64    def on_connect(self, client, userdata, flags, rc):
65        print("Subscribing")
66        client.subscribe(topic)
67
68    def on_message(self, client, userdata, msg):
69        if not self.got_message:
70            print("Got unexpected message: {}".format(msg.decode()))
71        else:
72            self.got_message.set_result(msg.payload)
73
74    def on_disconnect(self, client, userdata, rc):
75        self.disconnected.set_result(rc)
76
77    async def main(self):
78        self.disconnected = self.loop.create_future()
79        self.got_message = None
80
81        self.client = mqtt.Client(client_id=client_id)
82        self.client.on_connect = self.on_connect
83        self.client.on_message = self.on_message
84        self.client.on_disconnect = self.on_disconnect
85
86        aioh = AsyncioHelper(self.loop, self.client)
87
88        self.client.connect('mqtt.eclipse.org', 1883, 60)
89        self.client.socket().setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, 2048)
90
91        for c in range(3):
92            await asyncio.sleep(5)
93            print("Publishing")
94            self.got_message = self.loop.create_future()
95            self.client.publish(topic, b'Hello' * 40000, qos=1)
96            msg = await self.got_message
97            print("Got response with {} bytes".format(len(msg)))
98            self.got_message = None
99
100        self.client.disconnect()
101        print("Disconnected: {}".format(await self.disconnected))
102
103
104print("Starting")
105loop = asyncio.get_event_loop()
106loop.run_until_complete(AsyncMqttExample(loop).main())
107loop.close()
108print("Finished")
109