1# Copyright (c) 2012-2019 Roger Light and others
2#
3# All rights reserved. This program and the accompanying materials
4# are made available under the terms of the Eclipse Public License v2.0
5# and Eclipse Distribution License v1.0 which accompany this distribution.
6#
7# The Eclipse Public License is available at
8#    http://www.eclipse.org/legal/epl-v10.html
9# and the Eclipse Distribution License is available at
10#   http://www.eclipse.org/org/documents/edl-v10.php.
11#
12# Contributors:
13#    Roger Light - initial API and implementation
14#    Ian Craggs - MQTT V5 support
15
16import base64
17import hashlib
18import logging
19import string
20import struct
21import sys
22import threading
23import time
24import uuid
25
26from .matcher import MQTTMatcher
27from .properties import Properties
28from .reasoncodes import ReasonCodes
29from .subscribeoptions import SubscribeOptions
30
31"""
32This is an MQTT client module. MQTT is a lightweight pub/sub messaging
33protocol that is easy to implement and suitable for low powered devices.
34"""
35import collections
36import errno
37import os
38import platform
39import select
40import socket
41
42ssl = None
43try:
44    import ssl
45except ImportError:
46    pass
47
48socks = None
49try:
50    import socks
51except ImportError:
52    pass
53
54try:
55    # Python 3
56    from urllib import parse as urllib_dot_parse
57    from urllib import request as urllib_dot_request
58except ImportError:
59    # Python 2
60    import urllib as urllib_dot_request
61
62    import urlparse as urllib_dot_parse
63
64
65try:
66    # Use monotonic clock if available
67    time_func = time.monotonic
68except AttributeError:
69    time_func = time.time
70
71try:
72    import dns.resolver
73except ImportError:
74    HAVE_DNS = False
75else:
76    HAVE_DNS = True
77
78
79if platform.system() == 'Windows':
80    EAGAIN = errno.WSAEWOULDBLOCK
81else:
82    EAGAIN = errno.EAGAIN
83
84# Python 2.7 does not have BlockingIOError.  Fall back to IOError
85try:
86    BlockingIOError
87except NameError:
88    BlockingIOError  = IOError
89
90MQTTv31 = 3
91MQTTv311 = 4
92MQTTv5 = 5
93
94if sys.version_info[0] >= 3:
95    # define some alias for python2 compatibility
96    unicode = str
97    basestring = str
98
99# Message types
100CONNECT = 0x10
101CONNACK = 0x20
102PUBLISH = 0x30
103PUBACK = 0x40
104PUBREC = 0x50
105PUBREL = 0x60
106PUBCOMP = 0x70
107SUBSCRIBE = 0x80
108SUBACK = 0x90
109UNSUBSCRIBE = 0xA0
110UNSUBACK = 0xB0
111PINGREQ = 0xC0
112PINGRESP = 0xD0
113DISCONNECT = 0xE0
114AUTH = 0xF0
115
116# Log levels
117MQTT_LOG_INFO = 0x01
118MQTT_LOG_NOTICE = 0x02
119MQTT_LOG_WARNING = 0x04
120MQTT_LOG_ERR = 0x08
121MQTT_LOG_DEBUG = 0x10
122LOGGING_LEVEL = {
123    MQTT_LOG_DEBUG: logging.DEBUG,
124    MQTT_LOG_INFO: logging.INFO,
125    MQTT_LOG_NOTICE: logging.INFO,  # This has no direct equivalent level
126    MQTT_LOG_WARNING: logging.WARNING,
127    MQTT_LOG_ERR: logging.ERROR,
128}
129
130# CONNACK codes
131CONNACK_ACCEPTED = 0
132CONNACK_REFUSED_PROTOCOL_VERSION = 1
133CONNACK_REFUSED_IDENTIFIER_REJECTED = 2
134CONNACK_REFUSED_SERVER_UNAVAILABLE = 3
135CONNACK_REFUSED_BAD_USERNAME_PASSWORD = 4
136CONNACK_REFUSED_NOT_AUTHORIZED = 5
137
138# Connection state
139mqtt_cs_new = 0
140mqtt_cs_connected = 1
141mqtt_cs_disconnecting = 2
142mqtt_cs_connect_async = 3
143
144# Message state
145mqtt_ms_invalid = 0
146mqtt_ms_publish = 1
147mqtt_ms_wait_for_puback = 2
148mqtt_ms_wait_for_pubrec = 3
149mqtt_ms_resend_pubrel = 4
150mqtt_ms_wait_for_pubrel = 5
151mqtt_ms_resend_pubcomp = 6
152mqtt_ms_wait_for_pubcomp = 7
153mqtt_ms_send_pubrec = 8
154mqtt_ms_queued = 9
155
156# Error values
157MQTT_ERR_AGAIN = -1
158MQTT_ERR_SUCCESS = 0
159MQTT_ERR_NOMEM = 1
160MQTT_ERR_PROTOCOL = 2
161MQTT_ERR_INVAL = 3
162MQTT_ERR_NO_CONN = 4
163MQTT_ERR_CONN_REFUSED = 5
164MQTT_ERR_NOT_FOUND = 6
165MQTT_ERR_CONN_LOST = 7
166MQTT_ERR_TLS = 8
167MQTT_ERR_PAYLOAD_SIZE = 9
168MQTT_ERR_NOT_SUPPORTED = 10
169MQTT_ERR_AUTH = 11
170MQTT_ERR_ACL_DENIED = 12
171MQTT_ERR_UNKNOWN = 13
172MQTT_ERR_ERRNO = 14
173MQTT_ERR_QUEUE_SIZE = 15
174MQTT_ERR_KEEPALIVE = 16
175
176MQTT_CLIENT = 0
177MQTT_BRIDGE = 1
178
179# For MQTT V5, use the clean start flag only on the first successful connect
180MQTT_CLEAN_START_FIRST_ONLY = 3
181
182sockpair_data = b"0"
183
184
185class WebsocketConnectionError(ValueError):
186    pass
187
188
189def error_string(mqtt_errno):
190    """Return the error string associated with an mqtt error number."""
191    if mqtt_errno == MQTT_ERR_SUCCESS:
192        return "No error."
193    elif mqtt_errno == MQTT_ERR_NOMEM:
194        return "Out of memory."
195    elif mqtt_errno == MQTT_ERR_PROTOCOL:
196        return "A network protocol error occurred when communicating with the broker."
197    elif mqtt_errno == MQTT_ERR_INVAL:
198        return "Invalid function arguments provided."
199    elif mqtt_errno == MQTT_ERR_NO_CONN:
200        return "The client is not currently connected."
201    elif mqtt_errno == MQTT_ERR_CONN_REFUSED:
202        return "The connection was refused."
203    elif mqtt_errno == MQTT_ERR_NOT_FOUND:
204        return "Message not found (internal error)."
205    elif mqtt_errno == MQTT_ERR_CONN_LOST:
206        return "The connection was lost."
207    elif mqtt_errno == MQTT_ERR_TLS:
208        return "A TLS error occurred."
209    elif mqtt_errno == MQTT_ERR_PAYLOAD_SIZE:
210        return "Payload too large."
211    elif mqtt_errno == MQTT_ERR_NOT_SUPPORTED:
212        return "This feature is not supported."
213    elif mqtt_errno == MQTT_ERR_AUTH:
214        return "Authorisation failed."
215    elif mqtt_errno == MQTT_ERR_ACL_DENIED:
216        return "Access denied by ACL."
217    elif mqtt_errno == MQTT_ERR_UNKNOWN:
218        return "Unknown error."
219    elif mqtt_errno == MQTT_ERR_ERRNO:
220        return "Error defined by errno."
221    elif mqtt_errno == MQTT_ERR_QUEUE_SIZE:
222        return "Message queue full."
223    elif mqtt_errno == MQTT_ERR_KEEPALIVE:
224        return "Client or broker did not communicate in the keepalive interval."
225    else:
226        return "Unknown error."
227
228
229def connack_string(connack_code):
230    """Return the string associated with a CONNACK result."""
231    if connack_code == CONNACK_ACCEPTED:
232        return "Connection Accepted."
233    elif connack_code == CONNACK_REFUSED_PROTOCOL_VERSION:
234        return "Connection Refused: unacceptable protocol version."
235    elif connack_code == CONNACK_REFUSED_IDENTIFIER_REJECTED:
236        return "Connection Refused: identifier rejected."
237    elif connack_code == CONNACK_REFUSED_SERVER_UNAVAILABLE:
238        return "Connection Refused: broker unavailable."
239    elif connack_code == CONNACK_REFUSED_BAD_USERNAME_PASSWORD:
240        return "Connection Refused: bad user name or password."
241    elif connack_code == CONNACK_REFUSED_NOT_AUTHORIZED:
242        return "Connection Refused: not authorised."
243    else:
244        return "Connection Refused: unknown reason."
245
246
247def base62(num, base=string.digits + string.ascii_letters, padding=1):
248    """Convert a number to base-62 representation."""
249    assert num >= 0
250    digits = []
251    while num:
252        num, rest = divmod(num, 62)
253        digits.append(base[rest])
254    digits.extend(base[0] for _ in range(len(digits), padding))
255    return ''.join(reversed(digits))
256
257
258def topic_matches_sub(sub, topic):
259    """Check whether a topic matches a subscription.
260
261    For example:
262
263    foo/bar would match the subscription foo/# or +/bar
264    non/matching would not match the subscription non/+/+
265    """
266    matcher = MQTTMatcher()
267    matcher[sub] = True
268    try:
269        next(matcher.iter_match(topic))
270        return True
271    except StopIteration:
272        return False
273
274
275def _socketpair_compat():
276    """TCP/IP socketpair including Windows support"""
277    listensock = socket.socket(
278        socket.AF_INET, socket.SOCK_STREAM, socket.IPPROTO_IP)
279    listensock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
280    listensock.bind(("127.0.0.1", 0))
281    listensock.listen(1)
282
283    iface, port = listensock.getsockname()
284    sock1 = socket.socket(
285        socket.AF_INET, socket.SOCK_STREAM, socket.IPPROTO_IP)
286    sock1.setblocking(0)
287    try:
288        sock1.connect(("127.0.0.1", port))
289    except BlockingIOError:
290        pass
291    sock2, address = listensock.accept()
292    sock2.setblocking(0)
293    listensock.close()
294    return (sock1, sock2)
295
296
297class MQTTMessageInfo(object):
298    """This is a class returned from Client.publish() and can be used to find
299    out the mid of the message that was published, and to determine whether the
300    message has been published, and/or wait until it is published.
301    """
302
303    __slots__ = 'mid', '_published', '_condition', 'rc', '_iterpos'
304
305    def __init__(self, mid):
306        self.mid = mid
307        self._published = False
308        self._condition = threading.Condition()
309        self.rc = 0
310        self._iterpos = 0
311
312    def __str__(self):
313        return str((self.rc, self.mid))
314
315    def __iter__(self):
316        self._iterpos = 0
317        return self
318
319    def __next__(self):
320        return self.next()
321
322    def next(self):
323        if self._iterpos == 0:
324            self._iterpos = 1
325            return self.rc
326        elif self._iterpos == 1:
327            self._iterpos = 2
328            return self.mid
329        else:
330            raise StopIteration
331
332    def __getitem__(self, index):
333        if index == 0:
334            return self.rc
335        elif index == 1:
336            return self.mid
337        else:
338            raise IndexError("index out of range")
339
340    def _set_as_published(self):
341        with self._condition:
342            self._published = True
343            self._condition.notify()
344
345    def wait_for_publish(self, timeout=None):
346        """Block until the message associated with this object is published, or
347        until the timeout occurs. If timeout is None, this will never time out.
348        Set timeout to a positive number of seconds, e.g. 1.2, to enable the
349        timeout.
350
351        Raises ValueError if the message was not queued due to the outgoing
352        queue being full.
353
354        Raises RuntimeError if the message was not published for another
355        reason.
356        """
357        if self.rc == MQTT_ERR_QUEUE_SIZE:
358            raise ValueError('Message is not queued due to ERR_QUEUE_SIZE')
359        elif self.rc == MQTT_ERR_AGAIN:
360            pass
361        elif self.rc > 0:
362            raise RuntimeError('Message publish failed: %s' % (error_string(self.rc)))
363
364        timeout_time = None if timeout is None else time.time() + timeout
365        timeout_tenth = None if timeout is None else timeout / 10.
366        def timed_out():
367            return False if timeout is None else time.time() > timeout_time
368
369        with self._condition:
370            while not self._published and not timed_out():
371                self._condition.wait(timeout_tenth)
372
373    def is_published(self):
374        """Returns True if the message associated with this object has been
375        published, else returns False."""
376        if self.rc == MQTT_ERR_QUEUE_SIZE:
377            raise ValueError('Message is not queued due to ERR_QUEUE_SIZE')
378        elif self.rc == MQTT_ERR_AGAIN:
379            pass
380        elif self.rc > 0:
381            raise RuntimeError('Message publish failed: %s' % (error_string(self.rc)))
382
383        with self._condition:
384            return self._published
385
386
387class MQTTMessage(object):
388    """ This is a class that describes an incoming or outgoing message. It is
389    passed to the on_message callback as the message parameter.
390
391    Members:
392
393    topic : String. topic that the message was published on.
394    payload : Bytes/Byte array. the message payload.
395    qos : Integer. The message Quality of Service 0, 1 or 2.
396    retain : Boolean. If true, the message is a retained message and not fresh.
397    mid : Integer. The message id.
398    properties: Properties class. In MQTT v5.0, the properties associated with the message.
399    """
400
401    __slots__ = 'timestamp', 'state', 'dup', 'mid', '_topic', 'payload', 'qos', 'retain', 'info', 'properties'
402
403    def __init__(self, mid=0, topic=b""):
404        self.timestamp = 0
405        self.state = mqtt_ms_invalid
406        self.dup = False
407        self.mid = mid
408        self._topic = topic
409        self.payload = b""
410        self.qos = 0
411        self.retain = False
412        self.info = MQTTMessageInfo(mid)
413
414    def __eq__(self, other):
415        """Override the default Equals behavior"""
416        if isinstance(other, self.__class__):
417            return self.mid == other.mid
418        return False
419
420    def __ne__(self, other):
421        """Define a non-equality test"""
422        return not self.__eq__(other)
423
424    @property
425    def topic(self):
426        return self._topic.decode('utf-8')
427
428    @topic.setter
429    def topic(self, value):
430        self._topic = value
431
432
433class Client(object):
434    """MQTT version 3.1/3.1.1/5.0 client class.
435
436    This is the main class for use communicating with an MQTT broker.
437
438    General usage flow:
439
440    * Use connect()/connect_async() to connect to a broker
441    * Call loop() frequently to maintain network traffic flow with the broker
442    * Or use loop_start() to set a thread running to call loop() for you.
443    * Or use loop_forever() to handle calling loop() for you in a blocking
444    * function.
445    * Use subscribe() to subscribe to a topic and receive messages
446    * Use publish() to send messages
447    * Use disconnect() to disconnect from the broker
448
449    Data returned from the broker is made available with the use of callback
450    functions as described below.
451
452    Callbacks
453    =========
454
455    A number of callback functions are available to receive data back from the
456    broker. To use a callback, define a function and then assign it to the
457    client:
458
459    def on_connect(client, userdata, flags, rc):
460        print("Connection returned " + str(rc))
461
462    client.on_connect = on_connect
463
464    Callbacks can also be attached using decorators:
465
466    client = paho.mqtt.Client()
467
468    @client.connect_callback()
469    def on_connect(client, userdata, flags, rc):
470        print("Connection returned " + str(rc))
471
472
473    **IMPORTANT** the required function signature for a callback can differ
474    depending on whether you are using MQTT v5 or MQTT v3.1.1/v3.1. See the
475    documentation for each callback.
476
477    All of the callbacks as described below have a "client" and an "userdata"
478    argument. "client" is the Client instance that is calling the callback.
479    "userdata" is user data of any type and can be set when creating a new client
480    instance or with user_data_set(userdata).
481
482    If you wish to suppress exceptions within a callback, you should set
483    `client.suppress_exceptions = True`
484
485    The callbacks are listed below, documentation for each of them can be found
486    at the same function name:
487
488    on_connect, on_connect_fail, on_disconnect, on_message, on_publish,
489    on_subscribe, on_unsubscribe, on_log, on_socket_open, on_socket_close,
490    on_socket_register_write, on_socket_unregister_write
491    """
492
493    def __init__(self, client_id="", clean_session=None, userdata=None,
494                 protocol=MQTTv311, transport="tcp", reconnect_on_failure=True):
495        """client_id is the unique client id string used when connecting to the
496        broker. If client_id is zero length or None, then the behaviour is
497        defined by which protocol version is in use. If using MQTT v3.1.1, then
498        a zero length client id will be sent to the broker and the broker will
499        generate a random for the client. If using MQTT v3.1 then an id will be
500        randomly generated. In both cases, clean_session must be True. If this
501        is not the case a ValueError will be raised.
502
503        clean_session is a boolean that determines the client type. If True,
504        the broker will remove all information about this client when it
505        disconnects. If False, the client is a persistent client and
506        subscription information and queued messages will be retained when the
507        client disconnects.
508        Note that a client will never discard its own outgoing messages on
509        disconnect. Calling connect() or reconnect() will cause the messages to
510        be resent.  Use reinitialise() to reset a client to its original state.
511        The clean_session argument only applies to MQTT versions v3.1.1 and v3.1.
512        It is not accepted if the MQTT version is v5.0 - use the clean_start
513        argument on connect() instead.
514
515        userdata is user defined data of any type that is passed as the "userdata"
516        parameter to callbacks. It may be updated at a later point with the
517        user_data_set() function.
518
519        The protocol argument allows explicit setting of the MQTT version to
520        use for this client. Can be paho.mqtt.client.MQTTv311 (v3.1.1),
521        paho.mqtt.client.MQTTv31 (v3.1) or paho.mqtt.client.MQTTv5 (v5.0),
522        with the default being v3.1.1.
523
524        Set transport to "websockets" to use WebSockets as the transport
525        mechanism. Set to "tcp" to use raw TCP, which is the default.
526        """
527
528        if transport.lower() not in ('websockets', 'tcp'):
529            raise ValueError(
530                'transport must be "websockets" or "tcp", not %s' % transport)
531        self._transport = transport.lower()
532        self._protocol = protocol
533        self._userdata = userdata
534        self._sock = None
535        self._sockpairR, self._sockpairW = (None, None,)
536        self._keepalive = 60
537        self._connect_timeout = 5.0
538        self._client_mode = MQTT_CLIENT
539
540        if protocol == MQTTv5:
541            if clean_session is not None:
542                raise ValueError('Clean session is not used for MQTT 5.0')
543        else:
544            if clean_session is None:
545                clean_session = True
546            if not clean_session and (client_id == "" or client_id is None):
547                raise ValueError(
548                    'A client id must be provided if clean session is False.')
549            self._clean_session = clean_session
550
551        # [MQTT-3.1.3-4] Client Id must be UTF-8 encoded string.
552        if client_id == "" or client_id is None:
553            if protocol == MQTTv31:
554                self._client_id = base62(uuid.uuid4().int, padding=22)
555            else:
556                self._client_id = b""
557        else:
558            self._client_id = client_id
559        if isinstance(self._client_id, unicode):
560            self._client_id = self._client_id.encode('utf-8')
561
562        self._username = None
563        self._password = None
564        self._in_packet = {
565            "command": 0,
566            "have_remaining": 0,
567            "remaining_count": [],
568            "remaining_mult": 1,
569            "remaining_length": 0,
570            "packet": bytearray(b""),
571            "to_process": 0,
572            "pos": 0}
573        self._out_packet = collections.deque()
574        self._last_msg_in = time_func()
575        self._last_msg_out = time_func()
576        self._reconnect_min_delay = 1
577        self._reconnect_max_delay = 120
578        self._reconnect_delay = None
579        self._reconnect_on_failure = reconnect_on_failure
580        self._ping_t = 0
581        self._last_mid = 0
582        self._state = mqtt_cs_new
583        self._out_messages = collections.OrderedDict()
584        self._in_messages = collections.OrderedDict()
585        self._max_inflight_messages = 20
586        self._inflight_messages = 0
587        self._max_queued_messages = 0
588        self._connect_properties = None
589        self._will_properties = None
590        self._will = False
591        self._will_topic = b""
592        self._will_payload = b""
593        self._will_qos = 0
594        self._will_retain = False
595        self._on_message_filtered = MQTTMatcher()
596        self._host = ""
597        self._port = 1883
598        self._bind_address = ""
599        self._bind_port = 0
600        self._proxy = {}
601        self._in_callback_mutex = threading.Lock()
602        self._callback_mutex = threading.RLock()
603        self._msgtime_mutex = threading.Lock()
604        self._out_message_mutex = threading.RLock()
605        self._in_message_mutex = threading.Lock()
606        self._reconnect_delay_mutex = threading.Lock()
607        self._mid_generate_mutex = threading.Lock()
608        self._thread = None
609        self._thread_terminate = False
610        self._ssl = False
611        self._ssl_context = None
612        # Only used when SSL context does not have check_hostname attribute
613        self._tls_insecure = False
614        self._logger = None
615        self._registered_write = False
616        # No default callbacks
617        self._on_log = None
618        self._on_connect = None
619        self._on_connect_fail = None
620        self._on_subscribe = None
621        self._on_message = None
622        self._on_publish = None
623        self._on_unsubscribe = None
624        self._on_disconnect = None
625        self._on_socket_open = None
626        self._on_socket_close = None
627        self._on_socket_register_write = None
628        self._on_socket_unregister_write = None
629        self._websocket_path = "/mqtt"
630        self._websocket_extra_headers = None
631        # for clean_start == MQTT_CLEAN_START_FIRST_ONLY
632        self._mqttv5_first_connect = True
633        self.suppress_exceptions = False # For callbacks
634
635    def __del__(self):
636        self._reset_sockets()
637
638    def _sock_recv(self, bufsize):
639        try:
640            return self._sock.recv(bufsize)
641        except ssl.SSLWantReadError:
642            raise BlockingIOError
643        except ssl.SSLWantWriteError:
644            self._call_socket_register_write()
645            raise BlockingIOError
646
647    def _sock_send(self, buf):
648        try:
649            return self._sock.send(buf)
650        except ssl.SSLWantReadError:
651            raise BlockingIOError
652        except ssl.SSLWantWriteError:
653            self._call_socket_register_write()
654            raise BlockingIOError
655        except BlockingIOError:
656            self._call_socket_register_write()
657            raise BlockingIOError
658
659    def _sock_close(self):
660        """Close the connection to the server."""
661        if not self._sock:
662            return
663
664        try:
665            sock = self._sock
666            self._sock = None
667            self._call_socket_unregister_write(sock)
668            self._call_socket_close(sock)
669        finally:
670            # In case a callback fails, still close the socket to avoid leaking the file descriptor.
671            sock.close()
672
673    def _reset_sockets(self, sockpair_only=False):
674        if sockpair_only == False:
675            self._sock_close()
676
677        if self._sockpairR:
678            self._sockpairR.close()
679            self._sockpairR = None
680        if self._sockpairW:
681            self._sockpairW.close()
682            self._sockpairW = None
683
684    def reinitialise(self, client_id="", clean_session=True, userdata=None):
685        self._reset_sockets()
686
687        self.__init__(client_id, clean_session, userdata)
688
689    def ws_set_options(self, path="/mqtt", headers=None):
690        """ Set the path and headers for a websocket connection
691
692        path is a string starting with / which should be the endpoint of the
693        mqtt connection on the remote server
694
695        headers can be either a dict or a callable object. If it is a dict then
696        the extra items in the dict are added to the websocket headers. If it is
697        a callable, then the default websocket headers are passed into this
698        function and the result is used as the new headers.
699        """
700        self._websocket_path = path
701
702        if headers is not None:
703            if isinstance(headers, dict) or callable(headers):
704                self._websocket_extra_headers = headers
705            else:
706                raise ValueError(
707                    "'headers' option to ws_set_options has to be either a dictionary or callable")
708
709    def tls_set_context(self, context=None):
710        """Configure network encryption and authentication context. Enables SSL/TLS support.
711
712        context : an ssl.SSLContext object. By default this is given by
713        `ssl.create_default_context()`, if available.
714
715        Must be called before connect() or connect_async()."""
716        if self._ssl_context is not None:
717            raise ValueError('SSL/TLS has already been configured.')
718
719        # Assume that have SSL support, or at least that context input behaves like ssl.SSLContext
720        # in current versions of Python
721
722        if context is None:
723            if hasattr(ssl, 'create_default_context'):
724                context = ssl.create_default_context()
725            else:
726                raise ValueError('SSL/TLS context must be specified')
727
728        self._ssl = True
729        self._ssl_context = context
730
731        # Ensure _tls_insecure is consistent with check_hostname attribute
732        if hasattr(context, 'check_hostname'):
733            self._tls_insecure = not context.check_hostname
734
735    def tls_set(self, ca_certs=None, certfile=None, keyfile=None, cert_reqs=None, tls_version=None, ciphers=None, keyfile_password=None):
736        """Configure network encryption and authentication options. Enables SSL/TLS support.
737
738        ca_certs : a string path to the Certificate Authority certificate files
739        that are to be treated as trusted by this client. If this is the only
740        option given then the client will operate in a similar manner to a web
741        browser. That is to say it will require the broker to have a
742        certificate signed by the Certificate Authorities in ca_certs and will
743        communicate using TLS v1,2, but will not attempt any form of
744        authentication. This provides basic network encryption but may not be
745        sufficient depending on how the broker is configured.
746        By default, on Python 2.7.9+ or 3.4+, the default certification
747        authority of the system is used. On older Python version this parameter
748        is mandatory.
749
750        certfile and keyfile are strings pointing to the PEM encoded client
751        certificate and private keys respectively. If these arguments are not
752        None then they will be used as client information for TLS based
753        authentication.  Support for this feature is broker dependent. Note
754        that if either of these files in encrypted and needs a password to
755        decrypt it, then this can be passed using the keyfile_password
756        argument - you should take precautions to ensure that your password is
757        not hard coded into your program by loading the password from a file
758        for example. If you do not provide keyfile_password, the password will
759        be requested to be typed in at a terminal window.
760
761        cert_reqs allows the certificate requirements that the client imposes
762        on the broker to be changed. By default this is ssl.CERT_REQUIRED,
763        which means that the broker must provide a certificate. See the ssl
764        pydoc for more information on this parameter.
765
766        tls_version allows the version of the SSL/TLS protocol used to be
767        specified. By default TLS v1.2 is used. Previous versions are allowed
768        but not recommended due to possible security problems.
769
770        ciphers is a string specifying which encryption ciphers are allowable
771        for this connection, or None to use the defaults. See the ssl pydoc for
772        more information.
773
774        Must be called before connect() or connect_async()."""
775        if ssl is None:
776            raise ValueError('This platform has no SSL/TLS.')
777
778        if not hasattr(ssl, 'SSLContext'):
779            # Require Python version that has SSL context support in standard library
780            raise ValueError(
781                'Python 2.7.9 and 3.2 are the minimum supported versions for TLS.')
782
783        if ca_certs is None and not hasattr(ssl.SSLContext, 'load_default_certs'):
784            raise ValueError('ca_certs must not be None.')
785
786        # Create SSLContext object
787        if tls_version is None:
788            tls_version = ssl.PROTOCOL_TLSv1_2
789            # If the python version supports it, use highest TLS version automatically
790            if hasattr(ssl, "PROTOCOL_TLS"):
791                tls_version = ssl.PROTOCOL_TLS
792        context = ssl.SSLContext(tls_version)
793
794        # Configure context
795        if certfile is not None:
796            context.load_cert_chain(certfile, keyfile, keyfile_password)
797
798        if cert_reqs == ssl.CERT_NONE and hasattr(context, 'check_hostname'):
799            context.check_hostname = False
800
801        context.verify_mode = ssl.CERT_REQUIRED if cert_reqs is None else cert_reqs
802
803        if ca_certs is not None:
804            context.load_verify_locations(ca_certs)
805        else:
806            context.load_default_certs()
807
808        if ciphers is not None:
809            context.set_ciphers(ciphers)
810
811        self.tls_set_context(context)
812
813        if cert_reqs != ssl.CERT_NONE:
814            # Default to secure, sets context.check_hostname attribute
815            # if available
816            self.tls_insecure_set(False)
817        else:
818            # But with ssl.CERT_NONE, we can not check_hostname
819            self.tls_insecure_set(True)
820
821    def tls_insecure_set(self, value):
822        """Configure verification of the server hostname in the server certificate.
823
824        If value is set to true, it is impossible to guarantee that the host
825        you are connecting to is not impersonating your server. This can be
826        useful in initial server testing, but makes it possible for a malicious
827        third party to impersonate your server through DNS spoofing, for
828        example.
829
830        Do not use this function in a real system. Setting value to true means
831        there is no point using encryption.
832
833        Must be called before connect() and after either tls_set() or
834        tls_set_context()."""
835
836        if self._ssl_context is None:
837            raise ValueError(
838                'Must configure SSL context before using tls_insecure_set.')
839
840        self._tls_insecure = value
841
842        # Ensure check_hostname is consistent with _tls_insecure attribute
843        if hasattr(self._ssl_context, 'check_hostname'):
844            # Rely on SSLContext to check host name
845            # If verify_mode is CERT_NONE then the host name will never be checked
846            self._ssl_context.check_hostname = not value
847
848    def proxy_set(self, **proxy_args):
849        """Configure proxying of MQTT connection. Enables support for SOCKS or
850        HTTP proxies.
851
852        Proxying is done through the PySocks library. Brief descriptions of the
853        proxy_args parameters are below; see the PySocks docs for more info.
854
855        (Required)
856        proxy_type: One of {socks.HTTP, socks.SOCKS4, or socks.SOCKS5}
857        proxy_addr: IP address or DNS name of proxy server
858
859        (Optional)
860        proxy_rdns: boolean indicating whether proxy lookup should be performed
861            remotely (True, default) or locally (False)
862        proxy_username: username for SOCKS5 proxy, or userid for SOCKS4 proxy
863        proxy_password: password for SOCKS5 proxy
864
865        Must be called before connect() or connect_async()."""
866        if socks is None:
867            raise ValueError("PySocks must be installed for proxy support.")
868        elif not self._proxy_is_valid(proxy_args):
869            raise ValueError("proxy_type and/or proxy_addr are invalid.")
870        else:
871            self._proxy = proxy_args
872
873    def enable_logger(self, logger=None):
874        """ Enables a logger to send log messages to """
875        if logger is None:
876            if self._logger is not None:
877                # Do not replace existing logger
878                return
879            logger = logging.getLogger(__name__)
880        self._logger = logger
881
882    def disable_logger(self):
883        self._logger = None
884
885    def connect(self, host, port=1883, keepalive=60, bind_address="", bind_port=0,
886                clean_start=MQTT_CLEAN_START_FIRST_ONLY, properties=None):
887        """Connect to a remote broker.
888
889        host is the hostname or IP address of the remote broker.
890        port is the network port of the server host to connect to. Defaults to
891        1883. Note that the default port for MQTT over SSL/TLS is 8883 so if you
892        are using tls_set() the port may need providing.
893        keepalive: Maximum period in seconds between communications with the
894        broker. If no other messages are being exchanged, this controls the
895        rate at which the client will send ping messages to the broker.
896        clean_start: (MQTT v5.0 only) True, False or MQTT_CLEAN_START_FIRST_ONLY.
897        Sets the MQTT v5.0 clean_start flag always, never or on the first successful connect only,
898        respectively.  MQTT session data (such as outstanding messages and subscriptions)
899        is cleared on successful connect when the clean_start flag is set.
900        properties: (MQTT v5.0 only) the MQTT v5.0 properties to be sent in the
901        MQTT connect packet.
902        """
903
904        if self._protocol == MQTTv5:
905            self._mqttv5_first_connect = True
906        else:
907            if clean_start != MQTT_CLEAN_START_FIRST_ONLY:
908                raise ValueError("Clean start only applies to MQTT V5")
909            if properties != None:
910                raise ValueError("Properties only apply to MQTT V5")
911
912        self.connect_async(host, port, keepalive,
913                           bind_address, bind_port, clean_start, properties)
914        return self.reconnect()
915
916    def connect_srv(self, domain=None, keepalive=60, bind_address="",
917                    clean_start=MQTT_CLEAN_START_FIRST_ONLY, properties=None):
918        """Connect to a remote broker.
919
920        domain is the DNS domain to search for SRV records; if None,
921        try to determine local domain name.
922        keepalive, bind_address, clean_start and properties are as for connect()
923        """
924
925        if HAVE_DNS is False:
926            raise ValueError(
927                'No DNS resolver library found, try "pip install dnspython" or "pip3 install dnspython3".')
928
929        if domain is None:
930            domain = socket.getfqdn()
931            domain = domain[domain.find('.') + 1:]
932
933        try:
934            rr = '_mqtt._tcp.%s' % domain
935            if self._ssl:
936                # IANA specifies secure-mqtt (not mqtts) for port 8883
937                rr = '_secure-mqtt._tcp.%s' % domain
938            answers = []
939            for answer in dns.resolver.query(rr, dns.rdatatype.SRV):
940                addr = answer.target.to_text()[:-1]
941                answers.append(
942                    (addr, answer.port, answer.priority, answer.weight))
943        except (dns.resolver.NXDOMAIN, dns.resolver.NoAnswer, dns.resolver.NoNameservers):
944            raise ValueError("No answer/NXDOMAIN for SRV in %s" % (domain))
945
946        # FIXME: doesn't account for weight
947        for answer in answers:
948            host, port, prio, weight = answer
949
950            try:
951                return self.connect(host, port, keepalive, bind_address, clean_start, properties)
952            except Exception:
953                pass
954
955        raise ValueError("No SRV hosts responded")
956
957    def connect_async(self, host, port=1883, keepalive=60, bind_address="", bind_port=0,
958                      clean_start=MQTT_CLEAN_START_FIRST_ONLY, properties=None):
959        """Connect to a remote broker asynchronously. This is a non-blocking
960        connect call that can be used with loop_start() to provide very quick
961        start.
962
963        host is the hostname or IP address of the remote broker.
964        port is the network port of the server host to connect to. Defaults to
965        1883. Note that the default port for MQTT over SSL/TLS is 8883 so if you
966        are using tls_set() the port may need providing.
967        keepalive: Maximum period in seconds between communications with the
968        broker. If no other messages are being exchanged, this controls the
969        rate at which the client will send ping messages to the broker.
970        clean_start: (MQTT v5.0 only) True, False or MQTT_CLEAN_START_FIRST_ONLY.
971        Sets the MQTT v5.0 clean_start flag always, never or on the first successful connect only,
972        respectively.  MQTT session data (such as outstanding messages and subscriptions)
973        is cleared on successful connect when the clean_start flag is set.
974        properties: (MQTT v5.0 only) the MQTT v5.0 properties to be sent in the
975        MQTT connect packet.  Use the Properties class.
976        """
977        if host is None or len(host) == 0:
978            raise ValueError('Invalid host.')
979        if port <= 0:
980            raise ValueError('Invalid port number.')
981        if keepalive < 0:
982            raise ValueError('Keepalive must be >=0.')
983        if bind_address != "" and bind_address is not None:
984            if sys.version_info < (2, 7) or (3, 0) < sys.version_info < (3, 2):
985                raise ValueError('bind_address requires Python 2.7 or 3.2.')
986        if bind_port < 0:
987            raise ValueError('Invalid bind port number.')
988
989        self._host = host
990        self._port = port
991        self._keepalive = keepalive
992        self._bind_address = bind_address
993        self._bind_port = bind_port
994        self._clean_start = clean_start
995        self._connect_properties = properties
996        self._state = mqtt_cs_connect_async
997
998
999    def reconnect_delay_set(self, min_delay=1, max_delay=120):
1000        """ Configure the exponential reconnect delay
1001
1002            When connection is lost, wait initially min_delay seconds and
1003            double this time every attempt. The wait is capped at max_delay.
1004            Once the client is fully connected (e.g. not only TCP socket, but
1005            received a success CONNACK), the wait timer is reset to min_delay.
1006        """
1007        with self._reconnect_delay_mutex:
1008            self._reconnect_min_delay = min_delay
1009            self._reconnect_max_delay = max_delay
1010            self._reconnect_delay = None
1011
1012    def reconnect(self):
1013        """Reconnect the client after a disconnect. Can only be called after
1014        connect()/connect_async()."""
1015        if len(self._host) == 0:
1016            raise ValueError('Invalid host.')
1017        if self._port <= 0:
1018            raise ValueError('Invalid port number.')
1019
1020        self._in_packet = {
1021            "command": 0,
1022            "have_remaining": 0,
1023            "remaining_count": [],
1024            "remaining_mult": 1,
1025            "remaining_length": 0,
1026            "packet": bytearray(b""),
1027            "to_process": 0,
1028            "pos": 0}
1029
1030        self._out_packet = collections.deque()
1031
1032        with self._msgtime_mutex:
1033            self._last_msg_in = time_func()
1034            self._last_msg_out = time_func()
1035
1036        self._ping_t = 0
1037        self._state = mqtt_cs_new
1038
1039        self._sock_close()
1040
1041        # Put messages in progress in a valid state.
1042        self._messages_reconnect_reset()
1043
1044        sock = self._create_socket_connection()
1045
1046        if self._ssl:
1047            # SSL is only supported when SSLContext is available (implies Python >= 2.7.9 or >= 3.2)
1048
1049            verify_host = not self._tls_insecure
1050            try:
1051                # Try with server_hostname, even it's not supported in certain scenarios
1052                sock = self._ssl_context.wrap_socket(
1053                    sock,
1054                    server_hostname=self._host,
1055                    do_handshake_on_connect=False,
1056                )
1057            except ssl.CertificateError:
1058                # CertificateError is derived from ValueError
1059                raise
1060            except ValueError:
1061                # Python version requires SNI in order to handle server_hostname, but SNI is not available
1062                sock = self._ssl_context.wrap_socket(
1063                    sock,
1064                    do_handshake_on_connect=False,
1065                )
1066            else:
1067                # If SSL context has already checked hostname, then don't need to do it again
1068                if (hasattr(self._ssl_context, 'check_hostname') and
1069                        self._ssl_context.check_hostname):
1070                    verify_host = False
1071
1072            sock.settimeout(self._keepalive)
1073            sock.do_handshake()
1074
1075            if verify_host:
1076                ssl.match_hostname(sock.getpeercert(), self._host)
1077
1078        if self._transport == "websockets":
1079            sock.settimeout(self._keepalive)
1080            sock = WebsocketWrapper(sock, self._host, self._port, self._ssl,
1081                                    self._websocket_path, self._websocket_extra_headers)
1082
1083        self._sock = sock
1084        self._sock.setblocking(0)
1085        self._registered_write = False
1086        self._call_socket_open()
1087
1088        return self._send_connect(self._keepalive)
1089
1090    def loop(self, timeout=1.0, max_packets=1):
1091        """Process network events.
1092
1093        It is strongly recommended that you use loop_start(), or
1094        loop_forever(), or if you are using an external event loop using
1095        loop_read(), loop_write(), and loop_misc(). Using loop() on it's own is
1096        no longer recommended.
1097
1098        This function must be called regularly to ensure communication with the
1099        broker is carried out. It calls select() on the network socket to wait
1100        for network events. If incoming data is present it will then be
1101        processed. Outgoing commands, from e.g. publish(), are normally sent
1102        immediately that their function is called, but this is not always
1103        possible. loop() will also attempt to send any remaining outgoing
1104        messages, which also includes commands that are part of the flow for
1105        messages with QoS>0.
1106
1107        timeout: The time in seconds to wait for incoming/outgoing network
1108            traffic before timing out and returning.
1109        max_packets: Not currently used.
1110
1111        Returns MQTT_ERR_SUCCESS on success.
1112        Returns >0 on error.
1113
1114        A ValueError will be raised if timeout < 0"""
1115
1116        if self._sockpairR is None or self._sockpairW is None:
1117            self._reset_sockets(sockpair_only=True)
1118            self._sockpairR, self._sockpairW = _socketpair_compat()
1119
1120        return self._loop(timeout)
1121
1122    def _loop(self, timeout=1.0):
1123        if timeout < 0.0:
1124            raise ValueError('Invalid timeout.')
1125
1126        try:
1127            packet = self._out_packet.popleft()
1128            self._out_packet.appendleft(packet)
1129            wlist = [self._sock]
1130        except IndexError:
1131            wlist = []
1132
1133        # used to check if there are any bytes left in the (SSL) socket
1134        pending_bytes = 0
1135        if hasattr(self._sock, 'pending'):
1136            pending_bytes = self._sock.pending()
1137
1138        # if bytes are pending do not wait in select
1139        if pending_bytes > 0:
1140            timeout = 0.0
1141
1142        # sockpairR is used to break out of select() before the timeout, on a
1143        # call to publish() etc.
1144        if self._sockpairR is None:
1145            rlist = [self._sock]
1146        else:
1147            rlist = [self._sock, self._sockpairR]
1148
1149        try:
1150            socklist = select.select(rlist, wlist, [], timeout)
1151        except TypeError:
1152            # Socket isn't correct type, in likelihood connection is lost
1153            return MQTT_ERR_CONN_LOST
1154        except ValueError:
1155            # Can occur if we just reconnected but rlist/wlist contain a -1 for
1156            # some reason.
1157            return MQTT_ERR_CONN_LOST
1158        except Exception:
1159            # Note that KeyboardInterrupt, etc. can still terminate since they
1160            # are not derived from Exception
1161            return MQTT_ERR_UNKNOWN
1162
1163        if self._sock in socklist[0] or pending_bytes > 0:
1164            rc = self.loop_read()
1165            if rc or self._sock is None:
1166                return rc
1167
1168        if self._sockpairR and self._sockpairR in socklist[0]:
1169            # Stimulate output write even though we didn't ask for it, because
1170            # at that point the publish or other command wasn't present.
1171            socklist[1].insert(0, self._sock)
1172            # Clear sockpairR - only ever a single byte written.
1173            try:
1174                # Read many bytes at once - this allows up to 10000 calls to
1175                # publish() inbetween calls to loop().
1176                self._sockpairR.recv(10000)
1177            except BlockingIOError:
1178                pass
1179
1180        if self._sock in socklist[1]:
1181            rc = self.loop_write()
1182            if rc or self._sock is None:
1183                return rc
1184
1185        return self.loop_misc()
1186
1187    def publish(self, topic, payload=None, qos=0, retain=False, properties=None):
1188        """Publish a message on a topic.
1189
1190        This causes a message to be sent to the broker and subsequently from
1191        the broker to any clients subscribing to matching topics.
1192
1193        topic: The topic that the message should be published on.
1194        payload: The actual message to send. If not given, or set to None a
1195        zero length message will be used. Passing an int or float will result
1196        in the payload being converted to a string representing that number. If
1197        you wish to send a true int/float, use struct.pack() to create the
1198        payload you require.
1199        qos: The quality of service level to use.
1200        retain: If set to true, the message will be set as the "last known
1201        good"/retained message for the topic.
1202        properties: (MQTT v5.0 only) the MQTT v5.0 properties to be included.
1203        Use the Properties class.
1204
1205        Returns a MQTTMessageInfo class, which can be used to determine whether
1206        the message has been delivered (using info.is_published()) or to block
1207        waiting for the message to be delivered (info.wait_for_publish()). The
1208        message ID and return code of the publish() call can be found at
1209        info.mid and info.rc.
1210
1211        For backwards compatibility, the MQTTMessageInfo class is iterable so
1212        the old construct of (rc, mid) = client.publish(...) is still valid.
1213
1214        rc is MQTT_ERR_SUCCESS to indicate success or MQTT_ERR_NO_CONN if the
1215        client is not currently connected.  mid is the message ID for the
1216        publish request. The mid value can be used to track the publish request
1217        by checking against the mid argument in the on_publish() callback if it
1218        is defined.
1219
1220        A ValueError will be raised if topic is None, has zero length or is
1221        invalid (contains a wildcard), except if the MQTT version used is v5.0.
1222        For v5.0, a zero length topic can be used when a Topic Alias has been set.
1223
1224        A ValueError will be raised if qos is not one of 0, 1 or 2, or if
1225        the length of the payload is greater than 268435455 bytes."""
1226        if self._protocol != MQTTv5:
1227            if topic is None or len(topic) == 0:
1228                raise ValueError('Invalid topic.')
1229
1230        topic = topic.encode('utf-8')
1231
1232        if self._topic_wildcard_len_check(topic) != MQTT_ERR_SUCCESS:
1233            raise ValueError('Publish topic cannot contain wildcards.')
1234
1235        if qos < 0 or qos > 2:
1236            raise ValueError('Invalid QoS level.')
1237
1238        if isinstance(payload, unicode):
1239            local_payload = payload.encode('utf-8')
1240        elif isinstance(payload, (bytes, bytearray)):
1241            local_payload = payload
1242        elif isinstance(payload, (int, float)):
1243            local_payload = str(payload).encode('ascii')
1244        elif payload is None:
1245            local_payload = b''
1246        else:
1247            raise TypeError(
1248                'payload must be a string, bytearray, int, float or None.')
1249
1250        if len(local_payload) > 268435455:
1251            raise ValueError('Payload too large.')
1252
1253        local_mid = self._mid_generate()
1254
1255        if qos == 0:
1256            info = MQTTMessageInfo(local_mid)
1257            rc = self._send_publish(
1258                local_mid, topic, local_payload, qos, retain, False, info, properties)
1259            info.rc = rc
1260            return info
1261        else:
1262            message = MQTTMessage(local_mid, topic)
1263            message.timestamp = time_func()
1264            message.payload = local_payload
1265            message.qos = qos
1266            message.retain = retain
1267            message.dup = False
1268            message.properties = properties
1269
1270            with self._out_message_mutex:
1271                if self._max_queued_messages > 0 and len(self._out_messages) >= self._max_queued_messages:
1272                    message.info.rc = MQTT_ERR_QUEUE_SIZE
1273                    return message.info
1274
1275                if local_mid in self._out_messages:
1276                    message.info.rc = MQTT_ERR_QUEUE_SIZE
1277                    return message.info
1278
1279                self._out_messages[message.mid] = message
1280                if self._max_inflight_messages == 0 or self._inflight_messages < self._max_inflight_messages:
1281                    self._inflight_messages += 1
1282                    if qos == 1:
1283                        message.state = mqtt_ms_wait_for_puback
1284                    elif qos == 2:
1285                        message.state = mqtt_ms_wait_for_pubrec
1286
1287                    rc = self._send_publish(message.mid, topic, message.payload, message.qos, message.retain,
1288                                            message.dup, message.info, message.properties)
1289
1290                    # remove from inflight messages so it will be send after a connection is made
1291                    if rc is MQTT_ERR_NO_CONN:
1292                        self._inflight_messages -= 1
1293                        message.state = mqtt_ms_publish
1294
1295                    message.info.rc = rc
1296                    return message.info
1297                else:
1298                    message.state = mqtt_ms_queued
1299                    message.info.rc = MQTT_ERR_SUCCESS
1300                    return message.info
1301
1302    def username_pw_set(self, username, password=None):
1303        """Set a username and optionally a password for broker authentication.
1304
1305        Must be called before connect() to have any effect.
1306        Requires a broker that supports MQTT v3.1.
1307
1308        username: The username to authenticate with. Need have no relationship to the client id. Must be unicode
1309            [MQTT-3.1.3-11].
1310            Set to None to reset client back to not using username/password for broker authentication.
1311        password: The password to authenticate with. Optional, set to None if not required. If it is unicode, then it
1312            will be encoded as UTF-8.
1313        """
1314
1315        # [MQTT-3.1.3-11] User name must be UTF-8 encoded string
1316        self._username = None if username is None else username.encode('utf-8')
1317        self._password = password
1318        if isinstance(self._password, unicode):
1319            self._password = self._password.encode('utf-8')
1320
1321    def enable_bridge_mode(self):
1322        """Sets the client in a bridge mode instead of client mode.
1323
1324        Must be called before connect() to have any effect.
1325        Requires brokers that support bridge mode.
1326
1327        Under bridge mode, the broker will identify the client as a bridge and
1328        not send it's own messages back to it. Hence a subsciption of # is
1329        possible without message loops. This feature also correctly propagates
1330        the retain flag on the messages.
1331
1332        Currently Mosquitto and RSMB support this feature. This feature can
1333        be used to create a bridge between multiple broker.
1334        """
1335        self._client_mode = MQTT_BRIDGE
1336
1337    def is_connected(self):
1338        """Returns the current status of the connection
1339
1340        True if connection exists
1341        False if connection is closed
1342        """
1343        return self._state == mqtt_cs_connected
1344
1345    def disconnect(self, reasoncode=None, properties=None):
1346        """Disconnect a connected client from the broker.
1347        reasoncode: (MQTT v5.0 only) a ReasonCodes instance setting the MQTT v5.0
1348        reasoncode to be sent with the disconnect.  It is optional, the receiver
1349        then assuming that 0 (success) is the value.
1350        properties: (MQTT v5.0 only) a Properties instance setting the MQTT v5.0 properties
1351        to be included. Optional - if not set, no properties are sent.
1352        """
1353        self._state = mqtt_cs_disconnecting
1354
1355        if self._sock is None:
1356            return MQTT_ERR_NO_CONN
1357
1358        return self._send_disconnect(reasoncode, properties)
1359
1360    def subscribe(self, topic, qos=0, options=None, properties=None):
1361        """Subscribe the client to one or more topics.
1362
1363        This function may be called in three different ways (and a further three for MQTT v5.0):
1364
1365        Simple string and integer
1366        -------------------------
1367        e.g. subscribe("my/topic", 2)
1368
1369        topic: A string specifying the subscription topic to subscribe to.
1370        qos: The desired quality of service level for the subscription.
1371             Defaults to 0.
1372        options and properties: Not used.
1373
1374        Simple string and subscribe options (MQTT v5.0 only)
1375        ----------------------------------------------------
1376        e.g. subscribe("my/topic", options=SubscribeOptions(qos=2))
1377
1378        topic: A string specifying the subscription topic to subscribe to.
1379        qos: Not used.
1380        options: The MQTT v5.0 subscribe options.
1381        properties: a Properties instance setting the MQTT v5.0 properties
1382        to be included. Optional - if not set, no properties are sent.
1383
1384        String and integer tuple
1385        ------------------------
1386        e.g. subscribe(("my/topic", 1))
1387
1388        topic: A tuple of (topic, qos). Both topic and qos must be present in
1389               the tuple.
1390        qos and options: Not used.
1391        properties: Only used for MQTT v5.0.  A Properties instance setting the
1392        MQTT v5.0 properties. Optional - if not set, no properties are sent.
1393
1394        String and subscribe options tuple (MQTT v5.0 only)
1395        ---------------------------------------------------
1396        e.g. subscribe(("my/topic", SubscribeOptions(qos=1)))
1397
1398        topic: A tuple of (topic, SubscribeOptions). Both topic and subscribe
1399                options must be present in the tuple.
1400        qos and options: Not used.
1401        properties: a Properties instance setting the MQTT v5.0 properties
1402        to be included. Optional - if not set, no properties are sent.
1403
1404        List of string and integer tuples
1405        ---------------------------------
1406        e.g. subscribe([("my/topic", 0), ("another/topic", 2)])
1407
1408        This allows multiple topic subscriptions in a single SUBSCRIPTION
1409        command, which is more efficient than using multiple calls to
1410        subscribe().
1411
1412        topic: A list of tuple of format (topic, qos). Both topic and qos must
1413               be present in all of the tuples.
1414        qos, options and properties: Not used.
1415
1416        List of string and subscribe option tuples (MQTT v5.0 only)
1417        -----------------------------------------------------------
1418        e.g. subscribe([("my/topic", SubscribeOptions(qos=0), ("another/topic", SubscribeOptions(qos=2)])
1419
1420        This allows multiple topic subscriptions in a single SUBSCRIPTION
1421        command, which is more efficient than using multiple calls to
1422        subscribe().
1423
1424        topic: A list of tuple of format (topic, SubscribeOptions). Both topic and subscribe
1425                options must be present in all of the tuples.
1426        qos and options: Not used.
1427        properties: a Properties instance setting the MQTT v5.0 properties
1428        to be included. Optional - if not set, no properties are sent.
1429
1430        The function returns a tuple (result, mid), where result is
1431        MQTT_ERR_SUCCESS to indicate success or (MQTT_ERR_NO_CONN, None) if the
1432        client is not currently connected.  mid is the message ID for the
1433        subscribe request. The mid value can be used to track the subscribe
1434        request by checking against the mid argument in the on_subscribe()
1435        callback if it is defined.
1436
1437        Raises a ValueError if qos is not 0, 1 or 2, or if topic is None or has
1438        zero string length, or if topic is not a string, tuple or list.
1439        """
1440        topic_qos_list = None
1441
1442        if isinstance(topic, tuple):
1443            if self._protocol == MQTTv5:
1444                topic, options = topic
1445                if not isinstance(options, SubscribeOptions):
1446                    raise ValueError(
1447                        'Subscribe options must be instance of SubscribeOptions class.')
1448            else:
1449                topic, qos = topic
1450
1451        if isinstance(topic, basestring):
1452            if qos < 0 or qos > 2:
1453                raise ValueError('Invalid QoS level.')
1454            if self._protocol == MQTTv5:
1455                if options is None:
1456                    # if no options are provided, use the QoS passed instead
1457                    options = SubscribeOptions(qos=qos)
1458                elif qos != 0:
1459                    raise ValueError(
1460                        'Subscribe options and qos parameters cannot be combined.')
1461                if not isinstance(options, SubscribeOptions):
1462                    raise ValueError(
1463                        'Subscribe options must be instance of SubscribeOptions class.')
1464                topic_qos_list = [(topic.encode('utf-8'), options)]
1465            else:
1466                if topic is None or len(topic) == 0:
1467                    raise ValueError('Invalid topic.')
1468                topic_qos_list = [(topic.encode('utf-8'), qos)]
1469        elif isinstance(topic, list):
1470            topic_qos_list = []
1471            if self._protocol == MQTTv5:
1472                for t, o in topic:
1473                    if not isinstance(o, SubscribeOptions):
1474                        # then the second value should be QoS
1475                        if o < 0 or o > 2:
1476                            raise ValueError('Invalid QoS level.')
1477                        o = SubscribeOptions(qos=o)
1478                    topic_qos_list.append((t.encode('utf-8'), o))
1479            else:
1480                for t, q in topic:
1481                    if q < 0 or q > 2:
1482                        raise ValueError('Invalid QoS level.')
1483                    if t is None or len(t) == 0 or not isinstance(t, basestring):
1484                        raise ValueError('Invalid topic.')
1485                    topic_qos_list.append((t.encode('utf-8'), q))
1486
1487        if topic_qos_list is None:
1488            raise ValueError("No topic specified, or incorrect topic type.")
1489
1490        if any(self._filter_wildcard_len_check(topic) != MQTT_ERR_SUCCESS for topic, _ in topic_qos_list):
1491            raise ValueError('Invalid subscription filter.')
1492
1493        if self._sock is None:
1494            return (MQTT_ERR_NO_CONN, None)
1495
1496        return self._send_subscribe(False, topic_qos_list, properties)
1497
1498    def unsubscribe(self, topic, properties=None):
1499        """Unsubscribe the client from one or more topics.
1500
1501        topic: A single string, or list of strings that are the subscription
1502               topics to unsubscribe from.
1503        properties: (MQTT v5.0 only) a Properties instance setting the MQTT v5.0 properties
1504        to be included. Optional - if not set, no properties are sent.
1505
1506        Returns a tuple (result, mid), where result is MQTT_ERR_SUCCESS
1507        to indicate success or (MQTT_ERR_NO_CONN, None) if the client is not
1508        currently connected.
1509        mid is the message ID for the unsubscribe request. The mid value can be
1510        used to track the unsubscribe request by checking against the mid
1511        argument in the on_unsubscribe() callback if it is defined.
1512
1513        Raises a ValueError if topic is None or has zero string length, or is
1514        not a string or list.
1515        """
1516        topic_list = None
1517        if topic is None:
1518            raise ValueError('Invalid topic.')
1519        if isinstance(topic, basestring):
1520            if len(topic) == 0:
1521                raise ValueError('Invalid topic.')
1522            topic_list = [topic.encode('utf-8')]
1523        elif isinstance(topic, list):
1524            topic_list = []
1525            for t in topic:
1526                if len(t) == 0 or not isinstance(t, basestring):
1527                    raise ValueError('Invalid topic.')
1528                topic_list.append(t.encode('utf-8'))
1529
1530        if topic_list is None:
1531            raise ValueError("No topic specified, or incorrect topic type.")
1532
1533        if self._sock is None:
1534            return (MQTT_ERR_NO_CONN, None)
1535
1536        return self._send_unsubscribe(False, topic_list, properties)
1537
1538    def loop_read(self, max_packets=1):
1539        """Process read network events. Use in place of calling loop() if you
1540        wish to handle your client reads as part of your own application.
1541
1542        Use socket() to obtain the client socket to call select() or equivalent
1543        on.
1544
1545        Do not use if you are using the threaded interface loop_start()."""
1546        if self._sock is None:
1547            return MQTT_ERR_NO_CONN
1548
1549        max_packets = len(self._out_messages) + len(self._in_messages)
1550        if max_packets < 1:
1551            max_packets = 1
1552
1553        for _ in range(0, max_packets):
1554            if self._sock is None:
1555                return MQTT_ERR_NO_CONN
1556            rc = self._packet_read()
1557            if rc > 0:
1558                return self._loop_rc_handle(rc)
1559            elif rc == MQTT_ERR_AGAIN:
1560                return MQTT_ERR_SUCCESS
1561        return MQTT_ERR_SUCCESS
1562
1563    def loop_write(self, max_packets=1):
1564        """Process write network events. Use in place of calling loop() if you
1565        wish to handle your client writes as part of your own application.
1566
1567        Use socket() to obtain the client socket to call select() or equivalent
1568        on.
1569
1570        Use want_write() to determine if there is data waiting to be written.
1571
1572        Do not use if you are using the threaded interface loop_start()."""
1573        if self._sock is None:
1574            return MQTT_ERR_NO_CONN
1575
1576        try:
1577            rc = self._packet_write()
1578            if rc == MQTT_ERR_AGAIN:
1579                return MQTT_ERR_SUCCESS
1580            elif rc > 0:
1581                return self._loop_rc_handle(rc)
1582            else:
1583                return MQTT_ERR_SUCCESS
1584        finally:
1585            if self.want_write():
1586                self._call_socket_register_write()
1587            else:
1588                self._call_socket_unregister_write()
1589
1590    def want_write(self):
1591        """Call to determine if there is network data waiting to be written.
1592        Useful if you are calling select() yourself rather than using loop().
1593        """
1594        try:
1595            packet = self._out_packet.popleft()
1596            self._out_packet.appendleft(packet)
1597            return True
1598        except IndexError:
1599            return False
1600
1601    def loop_misc(self):
1602        """Process miscellaneous network events. Use in place of calling loop() if you
1603        wish to call select() or equivalent on.
1604
1605        Do not use if you are using the threaded interface loop_start()."""
1606        if self._sock is None:
1607            return MQTT_ERR_NO_CONN
1608
1609        now = time_func()
1610        self._check_keepalive()
1611
1612        if self._ping_t > 0 and now - self._ping_t >= self._keepalive:
1613            # client->ping_t != 0 means we are waiting for a pingresp.
1614            # This hasn't happened in the keepalive time so we should disconnect.
1615            self._sock_close()
1616
1617            if self._state == mqtt_cs_disconnecting:
1618                rc = MQTT_ERR_SUCCESS
1619            else:
1620                rc = MQTT_ERR_KEEPALIVE
1621
1622            self._do_on_disconnect(rc)
1623
1624            return MQTT_ERR_CONN_LOST
1625
1626        return MQTT_ERR_SUCCESS
1627
1628    def max_inflight_messages_set(self, inflight):
1629        """Set the maximum number of messages with QoS>0 that can be part way
1630        through their network flow at once. Defaults to 20."""
1631        if inflight < 0:
1632            raise ValueError('Invalid inflight.')
1633        self._max_inflight_messages = inflight
1634
1635    def max_queued_messages_set(self, queue_size):
1636        """Set the maximum number of messages in the outgoing message queue.
1637        0 means unlimited."""
1638        if queue_size < 0:
1639            raise ValueError('Invalid queue size.')
1640        if not isinstance(queue_size, int):
1641            raise ValueError('Invalid type of queue size.')
1642        self._max_queued_messages = queue_size
1643        return self
1644
1645    def message_retry_set(self, retry):
1646        """No longer used, remove in version 2.0"""
1647        pass
1648
1649    def user_data_set(self, userdata):
1650        """Set the user data variable passed to callbacks. May be any data type."""
1651        self._userdata = userdata
1652
1653    def will_set(self, topic, payload=None, qos=0, retain=False, properties=None):
1654        """Set a Will to be sent by the broker in case the client disconnects unexpectedly.
1655
1656        This must be called before connect() to have any effect.
1657
1658        topic: The topic that the will message should be published on.
1659        payload: The message to send as a will. If not given, or set to None a
1660        zero length message will be used as the will. Passing an int or float
1661        will result in the payload being converted to a string representing
1662        that number. If you wish to send a true int/float, use struct.pack() to
1663        create the payload you require.
1664        qos: The quality of service level to use for the will.
1665        retain: If set to true, the will message will be set as the "last known
1666        good"/retained message for the topic.
1667        properties: (MQTT v5.0 only) a Properties instance setting the MQTT v5.0 properties
1668        to be included with the will message. Optional - if not set, no properties are sent.
1669
1670        Raises a ValueError if qos is not 0, 1 or 2, or if topic is None or has
1671        zero string length.
1672        """
1673        if topic is None or len(topic) == 0:
1674            raise ValueError('Invalid topic.')
1675
1676        if qos < 0 or qos > 2:
1677            raise ValueError('Invalid QoS level.')
1678
1679        if properties != None and not isinstance(properties, Properties):
1680            raise ValueError(
1681                "The properties argument must be an instance of the Properties class.")
1682
1683        if isinstance(payload, unicode):
1684            self._will_payload = payload.encode('utf-8')
1685        elif isinstance(payload, (bytes, bytearray)):
1686            self._will_payload = payload
1687        elif isinstance(payload, (int, float)):
1688            self._will_payload = str(payload).encode('ascii')
1689        elif payload is None:
1690            self._will_payload = b""
1691        else:
1692            raise TypeError(
1693                'payload must be a string, bytearray, int, float or None.')
1694
1695        self._will = True
1696        self._will_topic = topic.encode('utf-8')
1697        self._will_qos = qos
1698        self._will_retain = retain
1699        self._will_properties = properties
1700
1701    def will_clear(self):
1702        """ Removes a will that was previously configured with will_set().
1703
1704        Must be called before connect() to have any effect."""
1705        self._will = False
1706        self._will_topic = b""
1707        self._will_payload = b""
1708        self._will_qos = 0
1709        self._will_retain = False
1710
1711    def socket(self):
1712        """Return the socket or ssl object for this client."""
1713        return self._sock
1714
1715    def loop_forever(self, timeout=1.0, max_packets=1, retry_first_connection=False):
1716        """This function calls the network loop functions for you in an
1717        infinite blocking loop. It is useful for the case where you only want
1718        to run the MQTT client loop in your program.
1719
1720        loop_forever() will handle reconnecting for you if reconnect_on_failure is
1721        true (this is the default behavior). If you call disconnect() in a callback
1722        it will return.
1723
1724
1725        timeout: The time in seconds to wait for incoming/outgoing network
1726          traffic before timing out and returning.
1727        max_packets: Not currently used.
1728        retry_first_connection: Should the first connection attempt be retried on failure.
1729          This is independent of the reconnect_on_failure setting.
1730
1731        Raises OSError/WebsocketConnectionError on first connection failures unless retry_first_connection=True
1732        """
1733
1734        run = True
1735
1736        while run:
1737            if self._thread_terminate is True:
1738                break
1739
1740            if self._state == mqtt_cs_connect_async:
1741                try:
1742                    self.reconnect()
1743                except (OSError, WebsocketConnectionError):
1744                    self._handle_on_connect_fail()
1745                    if not retry_first_connection:
1746                        raise
1747                    self._easy_log(
1748                        MQTT_LOG_DEBUG, "Connection failed, retrying")
1749                    self._reconnect_wait()
1750            else:
1751                break
1752
1753        while run:
1754            rc = MQTT_ERR_SUCCESS
1755            while rc == MQTT_ERR_SUCCESS:
1756                rc = self._loop(timeout)
1757                # We don't need to worry about locking here, because we've
1758                # either called loop_forever() when in single threaded mode, or
1759                # in multi threaded mode when loop_stop() has been called and
1760                # so no other threads can access _out_packet or _messages.
1761                if (self._thread_terminate is True
1762                    and len(self._out_packet) == 0
1763                        and len(self._out_messages) == 0):
1764                    rc = 1
1765                    run = False
1766
1767            def should_exit():
1768                return self._state == mqtt_cs_disconnecting or run is False or self._thread_terminate is True
1769
1770            if should_exit() or not self._reconnect_on_failure:
1771                run = False
1772            else:
1773                self._reconnect_wait()
1774
1775                if should_exit():
1776                    run = False
1777                else:
1778                    try:
1779                        self.reconnect()
1780                    except (OSError, WebsocketConnectionError):
1781                        self._handle_on_connect_fail()
1782                        self._easy_log(
1783                            MQTT_LOG_DEBUG, "Connection failed, retrying")
1784
1785        return rc
1786
1787    def loop_start(self):
1788        """This is part of the threaded client interface. Call this once to
1789        start a new thread to process network traffic. This provides an
1790        alternative to repeatedly calling loop() yourself.
1791        """
1792        if self._thread is not None:
1793            return MQTT_ERR_INVAL
1794
1795        self._sockpairR, self._sockpairW = _socketpair_compat()
1796        self._thread_terminate = False
1797        self._thread = threading.Thread(target=self._thread_main)
1798        self._thread.daemon = True
1799        self._thread.start()
1800
1801    def loop_stop(self, force=False):
1802        """This is part of the threaded client interface. Call this once to
1803        stop the network thread previously created with loop_start(). This call
1804        will block until the network thread finishes.
1805
1806        The force parameter is currently ignored.
1807        """
1808        if self._thread is None:
1809            return MQTT_ERR_INVAL
1810
1811        self._thread_terminate = True
1812        if threading.current_thread() != self._thread:
1813            self._thread.join()
1814            self._thread = None
1815
1816    @property
1817    def on_log(self):
1818        """If implemented, called when the client has log information.
1819        Defined to allow debugging."""
1820        return self._on_log
1821
1822    @on_log.setter
1823    def on_log(self, func):
1824        """ Define the logging callback implementation.
1825
1826        Expected signature is:
1827            log_callback(client, userdata, level, buf)
1828
1829        client:     the client instance for this callback
1830        userdata:   the private user data as set in Client() or userdata_set()
1831        level:      gives the severity of the message and will be one of
1832                    MQTT_LOG_INFO, MQTT_LOG_NOTICE, MQTT_LOG_WARNING,
1833                    MQTT_LOG_ERR, and MQTT_LOG_DEBUG.
1834        buf:        the message itself
1835
1836        Decorator: @client.log_callback() (```client``` is the name of the
1837            instance which this callback is being attached to)
1838        """
1839        self._on_log = func
1840
1841    def log_callback(self):
1842        def decorator(func):
1843            self.on_log = func
1844            return func
1845        return decorator
1846
1847    @property
1848    def on_connect(self):
1849        """If implemented, called when the broker responds to our connection
1850        request."""
1851        return self._on_connect
1852
1853    @on_connect.setter
1854    def on_connect(self, func):
1855        """ Define the connect callback implementation.
1856
1857        Expected signature for MQTT v3.1 and v3.1.1 is:
1858            connect_callback(client, userdata, flags, rc)
1859
1860        and for MQTT v5.0:
1861            connect_callback(client, userdata, flags, reasonCode, properties)
1862
1863        client:     the client instance for this callback
1864        userdata:   the private user data as set in Client() or userdata_set()
1865        flags:      response flags sent by the broker
1866        rc:         the connection result
1867        reasonCode: the MQTT v5.0 reason code: an instance of the ReasonCode class.
1868                    ReasonCode may be compared to integer.
1869        properties: the MQTT v5.0 properties returned from the broker.  An instance
1870                    of the Properties class.
1871                    For MQTT v3.1 and v3.1.1 properties is not provided but for compatibility
1872                    with MQTT v5.0, we recommend adding properties=None.
1873
1874        flags is a dict that contains response flags from the broker:
1875            flags['session present'] - this flag is useful for clients that are
1876                using clean session set to 0 only. If a client with clean
1877                session=0, that reconnects to a broker that it has previously
1878                connected to, this flag indicates whether the broker still has the
1879                session information for the client. If 1, the session still exists.
1880
1881        The value of rc indicates success or not:
1882            0: Connection successful
1883            1: Connection refused - incorrect protocol version
1884            2: Connection refused - invalid client identifier
1885            3: Connection refused - server unavailable
1886            4: Connection refused - bad username or password
1887            5: Connection refused - not authorised
1888            6-255: Currently unused.
1889
1890        Decorator: @client.connect_callback() (```client``` is the name of the
1891            instance which this callback is being attached to)
1892
1893        """
1894        with self._callback_mutex:
1895            self._on_connect = func
1896
1897    def connect_callback(self):
1898        def decorator(func):
1899            self.on_connect = func
1900            return func
1901        return decorator
1902
1903    @property
1904    def on_connect_fail(self):
1905        """If implemented, called when the client failed to connect
1906        to the broker."""
1907        return self._on_connect_fail
1908
1909    @on_connect_fail.setter
1910    def on_connect_fail(self, func):
1911        """ Define the connection failure callback implementation
1912
1913        Expected signature is:
1914            on_connect_fail(client, userdata)
1915
1916        client:     the client instance for this callback
1917        userdata:   the private user data as set in Client() or userdata_set()
1918
1919        Decorator: @client.connect_fail_callback() (```client``` is the name of the
1920            instance which this callback is being attached to)
1921
1922        """
1923        with self._callback_mutex:
1924            self._on_connect_fail = func
1925
1926    def connect_fail_callback(self):
1927        def decorator(func):
1928            self.on_connect_fail = func
1929            return func
1930        return decorator
1931
1932    @property
1933    def on_subscribe(self):
1934        """If implemented, called when the broker responds to a subscribe
1935        request."""
1936        return self._on_subscribe
1937
1938    @on_subscribe.setter
1939    def on_subscribe(self, func):
1940        """ Define the subscribe callback implementation.
1941
1942        Expected signature for MQTT v3.1.1 and v3.1 is:
1943            subscribe_callback(client, userdata, mid, granted_qos)
1944
1945        and for MQTT v5.0:
1946            subscribe_callback(client, userdata, mid, reasonCodes, properties)
1947
1948        client:         the client instance for this callback
1949        userdata:       the private user data as set in Client() or userdata_set()
1950        mid:            matches the mid variable returned from the corresponding
1951                        subscribe() call.
1952        granted_qos:    list of integers that give the QoS level the broker has
1953                        granted for each of the different subscription requests.
1954        reasonCodes:    the MQTT v5.0 reason codes received from the broker for each
1955                        subscription.  A list of ReasonCodes instances.
1956        properties:     the MQTT v5.0 properties received from the broker.  A
1957                        list of Properties class instances.
1958
1959        Decorator: @client.subscribe_callback() (```client``` is the name of the
1960            instance which this callback is being attached to)
1961        """
1962        with self._callback_mutex:
1963            self._on_subscribe = func
1964
1965    def subscribe_callback(self):
1966        def decorator(func):
1967            self.on_subscribe = func
1968            return func
1969        return decorator
1970
1971    @property
1972    def on_message(self):
1973        """If implemented, called when a message has been received on a topic
1974        that the client subscribes to.
1975
1976        This callback will be called for every message received. Use
1977        message_callback_add() to define multiple callbacks that will be called
1978        for specific topic filters."""
1979        return self._on_message
1980
1981    @on_message.setter
1982    def on_message(self, func):
1983        """ Define the message received callback implementation.
1984
1985        Expected signature is:
1986            on_message_callback(client, userdata, message)
1987
1988        client:     the client instance for this callback
1989        userdata:   the private user data as set in Client() or userdata_set()
1990        message:    an instance of MQTTMessage.
1991                    This is a class with members topic, payload, qos, retain.
1992
1993        Decorator: @client.message_callback() (```client``` is the name of the
1994            instance which this callback is being attached to)
1995
1996        """
1997        with self._callback_mutex:
1998            self._on_message = func
1999
2000    def message_callback(self):
2001        def decorator(func):
2002            self.on_message = func
2003            return func
2004        return decorator
2005
2006    @property
2007    def on_publish(self):
2008        """If implemented, called when a message that was to be sent using the
2009        publish() call has completed transmission to the broker.
2010
2011        For messages with QoS levels 1 and 2, this means that the appropriate
2012        handshakes have completed. For QoS 0, this simply means that the message
2013        has left the client.
2014        This callback is important because even if the publish() call returns
2015        success, it does not always mean that the message has been sent."""
2016        return self._on_publish
2017
2018    @on_publish.setter
2019    def on_publish(self, func):
2020        """ Define the published message callback implementation.
2021
2022        Expected signature is:
2023            on_publish_callback(client, userdata, mid)
2024
2025        client:     the client instance for this callback
2026        userdata:   the private user data as set in Client() or userdata_set()
2027        mid:        matches the mid variable returned from the corresponding
2028                    publish() call, to allow outgoing messages to be tracked.
2029
2030        Decorator: @client.publish_callback() (```client``` is the name of the
2031            instance which this callback is being attached to)
2032
2033        """
2034        with self._callback_mutex:
2035            self._on_publish = func
2036
2037    def publish_callback(self):
2038        def decorator(func):
2039            self.on_publish = func
2040            return func
2041        return decorator
2042
2043    @property
2044    def on_unsubscribe(self):
2045        """If implemented, called when the broker responds to an unsubscribe
2046        request."""
2047        return self._on_unsubscribe
2048
2049    @on_unsubscribe.setter
2050    def on_unsubscribe(self, func):
2051        """ Define the unsubscribe callback implementation.
2052
2053        Expected signature for MQTT v3.1.1 and v3.1 is:
2054            unsubscribe_callback(client, userdata, mid)
2055
2056        and for MQTT v5.0:
2057            unsubscribe_callback(client, userdata, mid, properties, reasonCodes)
2058
2059        client:         the client instance for this callback
2060        userdata:       the private user data as set in Client() or userdata_set()
2061        mid:            matches the mid variable returned from the corresponding
2062                        unsubscribe() call.
2063        properties:     the MQTT v5.0 properties received from the broker.  A
2064                        list of Properties class instances.
2065        reasonCodes:    the MQTT v5.0 reason codes received from the broker for each
2066                        unsubscribe topic.  A list of ReasonCodes instances
2067
2068        Decorator: @client.unsubscribe_callback() (```client``` is the name of the
2069            instance which this callback is being attached to)
2070        """
2071        with self._callback_mutex:
2072            self._on_unsubscribe = func
2073
2074    def unsubscribe_callback(self):
2075        def decorator(func):
2076            self.on_unsubscribe = func
2077            return func
2078        return decorator
2079
2080    @property
2081    def on_disconnect(self):
2082        """If implemented, called when the client disconnects from the broker.
2083        """
2084        return self._on_disconnect
2085
2086    @on_disconnect.setter
2087    def on_disconnect(self, func):
2088        """ Define the disconnect callback implementation.
2089
2090        Expected signature for MQTT v3.1.1 and v3.1 is:
2091            disconnect_callback(client, userdata, rc)
2092
2093        and for MQTT v5.0:
2094            disconnect_callback(client, userdata, reasonCode, properties)
2095
2096        client:     the client instance for this callback
2097        userdata:   the private user data as set in Client() or userdata_set()
2098        rc:         the disconnection result
2099                    The rc parameter indicates the disconnection state. If
2100                    MQTT_ERR_SUCCESS (0), the callback was called in response to
2101                    a disconnect() call. If any other value the disconnection
2102                    was unexpected, such as might be caused by a network error.
2103
2104        Decorator: @client.disconnect_callback() (```client``` is the name of the
2105            instance which this callback is being attached to)
2106
2107        """
2108        with self._callback_mutex:
2109            self._on_disconnect = func
2110
2111    def disconnect_callback(self):
2112        def decorator(func):
2113            self.on_disconnect = func
2114            return func
2115        return decorator
2116
2117    @property
2118    def on_socket_open(self):
2119        """If implemented, called just after the socket was opend."""
2120        return self._on_socket_open
2121
2122    @on_socket_open.setter
2123    def on_socket_open(self, func):
2124        """Define the socket_open callback implementation.
2125
2126        This should be used to register the socket to an external event loop for reading.
2127
2128        Expected signature is:
2129            socket_open_callback(client, userdata, socket)
2130
2131        client:     the client instance for this callback
2132        userdata:   the private user data as set in Client() or userdata_set()
2133        sock:       the socket which was just opened.
2134
2135        Decorator: @client.socket_open_callback() (```client``` is the name of the
2136            instance which this callback is being attached to)
2137        """
2138        with self._callback_mutex:
2139            self._on_socket_open = func
2140
2141    def socket_open_callback(self):
2142        def decorator(func):
2143            self.on_socket_open = func
2144            return func
2145        return decorator
2146
2147    def _call_socket_open(self):
2148        """Call the socket_open callback with the just-opened socket"""
2149        with self._callback_mutex:
2150            on_socket_open = self.on_socket_open
2151
2152        if on_socket_open:
2153            with self._in_callback_mutex:
2154                try:
2155                    on_socket_open(self, self._userdata, self._sock)
2156                except Exception as err:
2157                    self._easy_log(
2158                        MQTT_LOG_ERR, 'Caught exception in on_socket_open: %s', err)
2159                    if not self.suppress_exceptions:
2160                        raise
2161
2162    @property
2163    def on_socket_close(self):
2164        """If implemented, called just before the socket is closed."""
2165        return self._on_socket_close
2166
2167    @on_socket_close.setter
2168    def on_socket_close(self, func):
2169        """Define the socket_close callback implementation.
2170
2171        This should be used to unregister the socket from an external event loop for reading.
2172
2173        Expected signature is:
2174            socket_close_callback(client, userdata, socket)
2175
2176        client:     the client instance for this callback
2177        userdata:   the private user data as set in Client() or userdata_set()
2178        sock:       the socket which is about to be closed.
2179
2180        Decorator: @client.socket_close_callback() (```client``` is the name of the
2181            instance which this callback is being attached to)
2182        """
2183        with self._callback_mutex:
2184            self._on_socket_close = func
2185
2186    def socket_close_callback(self):
2187        def decorator(func):
2188            self.on_socket_close = func
2189            return func
2190        return decorator
2191
2192    def _call_socket_close(self, sock):
2193        """Call the socket_close callback with the about-to-be-closed socket"""
2194        with self._callback_mutex:
2195            on_socket_close = self.on_socket_close
2196
2197        if on_socket_close:
2198            with self._in_callback_mutex:
2199                try:
2200                    on_socket_close(self, self._userdata, sock)
2201                except Exception as err:
2202                    self._easy_log(
2203                        MQTT_LOG_ERR, 'Caught exception in on_socket_close: %s', err)
2204                    if not self.suppress_exceptions:
2205                        raise
2206
2207    @property
2208    def on_socket_register_write(self):
2209        """If implemented, called when the socket needs writing but can't."""
2210        return self._on_socket_register_write
2211
2212    @on_socket_register_write.setter
2213    def on_socket_register_write(self, func):
2214        """Define the socket_register_write callback implementation.
2215
2216        This should be used to register the socket with an external event loop for writing.
2217
2218        Expected signature is:
2219            socket_register_write_callback(client, userdata, socket)
2220
2221        client:     the client instance for this callback
2222        userdata:   the private user data as set in Client() or userdata_set()
2223        sock:       the socket which should be registered for writing
2224
2225        Decorator: @client.socket_register_write_callback() (```client``` is the name of the
2226            instance which this callback is being attached to)
2227        """
2228        with self._callback_mutex:
2229            self._on_socket_register_write = func
2230
2231    def socket_register_write_callback(self):
2232        def decorator(func):
2233            self._on_socket_register_write = func
2234            return func
2235        return decorator
2236
2237    def _call_socket_register_write(self):
2238        """Call the socket_register_write callback with the unwritable socket"""
2239        if not self._sock or self._registered_write:
2240            return
2241        self._registered_write = True
2242        with self._callback_mutex:
2243            on_socket_register_write = self.on_socket_register_write
2244
2245        if on_socket_register_write:
2246            try:
2247                on_socket_register_write(
2248                    self, self._userdata, self._sock)
2249            except Exception as err:
2250                self._easy_log(
2251                    MQTT_LOG_ERR, 'Caught exception in on_socket_register_write: %s', err)
2252                if not self.suppress_exceptions:
2253                    raise
2254
2255    @property
2256    def on_socket_unregister_write(self):
2257        """If implemented, called when the socket doesn't need writing anymore."""
2258        return self._on_socket_unregister_write
2259
2260    @on_socket_unregister_write.setter
2261    def on_socket_unregister_write(self, func):
2262        """Define the socket_unregister_write callback implementation.
2263
2264        This should be used to unregister the socket from an external event loop for writing.
2265
2266        Expected signature is:
2267            socket_unregister_write_callback(client, userdata, socket)
2268
2269        client:     the client instance for this callback
2270        userdata:   the private user data as set in Client() or userdata_set()
2271        sock:       the socket which should be unregistered for writing
2272
2273        Decorator: @client.socket_unregister_write_callback() (```client``` is the name of the
2274            instance which this callback is being attached to)
2275        """
2276        with self._callback_mutex:
2277            self._on_socket_unregister_write = func
2278
2279    def socket_unregister_write_callback(self):
2280        def decorator(func):
2281            self._on_socket_unregister_write = func
2282            return func
2283        return decorator
2284
2285    def _call_socket_unregister_write(self, sock=None):
2286        """Call the socket_unregister_write callback with the writable socket"""
2287        sock = sock or self._sock
2288        if not sock or not self._registered_write:
2289            return
2290        self._registered_write = False
2291
2292        with self._callback_mutex:
2293            on_socket_unregister_write = self.on_socket_unregister_write
2294
2295        if on_socket_unregister_write:
2296            try:
2297                on_socket_unregister_write(self, self._userdata, sock)
2298            except Exception as err:
2299                self._easy_log(
2300                    MQTT_LOG_ERR, 'Caught exception in on_socket_unregister_write: %s', err)
2301                if not self.suppress_exceptions:
2302                    raise
2303
2304    def message_callback_add(self, sub, callback):
2305        """Register a message callback for a specific topic.
2306        Messages that match 'sub' will be passed to 'callback'. Any
2307        non-matching messages will be passed to the default on_message
2308        callback.
2309
2310        Call multiple times with different 'sub' to define multiple topic
2311        specific callbacks.
2312
2313        Topic specific callbacks may be removed with
2314        message_callback_remove()."""
2315        if callback is None or sub is None:
2316            raise ValueError("sub and callback must both be defined.")
2317
2318        with self._callback_mutex:
2319            self._on_message_filtered[sub] = callback
2320
2321    def topic_callback(self, sub):
2322        def decorator(func):
2323            self.message_callback_add(sub, func)
2324            return func
2325        return decorator
2326
2327    def message_callback_remove(self, sub):
2328        """Remove a message callback previously registered with
2329        message_callback_add()."""
2330        if sub is None:
2331            raise ValueError("sub must defined.")
2332
2333        with self._callback_mutex:
2334            try:
2335                del self._on_message_filtered[sub]
2336            except KeyError:  # no such subscription
2337                pass
2338
2339    # ============================================================
2340    # Private functions
2341    # ============================================================
2342
2343    def _loop_rc_handle(self, rc, properties=None):
2344        if rc:
2345            self._sock_close()
2346
2347            if self._state == mqtt_cs_disconnecting:
2348                rc = MQTT_ERR_SUCCESS
2349
2350            self._do_on_disconnect(rc, properties)
2351
2352        return rc
2353
2354    def _packet_read(self):
2355        # This gets called if pselect() indicates that there is network data
2356        # available - ie. at least one byte.  What we do depends on what data we
2357        # already have.
2358        # If we've not got a command, attempt to read one and save it. This should
2359        # always work because it's only a single byte.
2360        # Then try to read the remaining length. This may fail because it is may
2361        # be more than one byte - will need to save data pending next read if it
2362        # does fail.
2363        # Then try to read the remaining payload, where 'payload' here means the
2364        # combined variable header and actual payload. This is the most likely to
2365        # fail due to longer length, so save current data and current position.
2366        # After all data is read, send to _mqtt_handle_packet() to deal with.
2367        # Finally, free the memory and reset everything to starting conditions.
2368        if self._in_packet['command'] == 0:
2369            try:
2370                command = self._sock_recv(1)
2371            except BlockingIOError:
2372                return MQTT_ERR_AGAIN
2373            except ConnectionError as err:
2374                self._easy_log(
2375                    MQTT_LOG_ERR, 'failed to receive on socket: %s', err)
2376                return MQTT_ERR_CONN_LOST
2377            else:
2378                if len(command) == 0:
2379                    return MQTT_ERR_CONN_LOST
2380                command, = struct.unpack("!B", command)
2381                self._in_packet['command'] = command
2382
2383        if self._in_packet['have_remaining'] == 0:
2384            # Read remaining
2385            # Algorithm for decoding taken from pseudo code at
2386            # http://publib.boulder.ibm.com/infocenter/wmbhelp/v6r0m0/topic/com.ibm.etools.mft.doc/ac10870_.htm
2387            while True:
2388                try:
2389                    byte = self._sock_recv(1)
2390                except BlockingIOError:
2391                    return MQTT_ERR_AGAIN
2392                except ConnectionError as err:
2393                    self._easy_log(
2394                        MQTT_LOG_ERR, 'failed to receive on socket: %s', err)
2395                    return MQTT_ERR_CONN_LOST
2396                else:
2397                    if len(byte) == 0:
2398                        return MQTT_ERR_CONN_LOST
2399                    byte, = struct.unpack("!B", byte)
2400                    self._in_packet['remaining_count'].append(byte)
2401                    # Max 4 bytes length for remaining length as defined by protocol.
2402                    # Anything more likely means a broken/malicious client.
2403                    if len(self._in_packet['remaining_count']) > 4:
2404                        return MQTT_ERR_PROTOCOL
2405
2406                    self._in_packet['remaining_length'] += (
2407                        byte & 127) * self._in_packet['remaining_mult']
2408                    self._in_packet['remaining_mult'] = self._in_packet['remaining_mult'] * 128
2409
2410                if (byte & 128) == 0:
2411                    break
2412
2413            self._in_packet['have_remaining'] = 1
2414            self._in_packet['to_process'] = self._in_packet['remaining_length']
2415
2416        count = 100 # Don't get stuck in this loop if we have a huge message.
2417        while self._in_packet['to_process'] > 0:
2418            try:
2419                data = self._sock_recv(self._in_packet['to_process'])
2420            except BlockingIOError:
2421                return MQTT_ERR_AGAIN
2422            except ConnectionError as err:
2423                self._easy_log(
2424                    MQTT_LOG_ERR, 'failed to receive on socket: %s', err)
2425                return MQTT_ERR_CONN_LOST
2426            else:
2427                if len(data) == 0:
2428                    return MQTT_ERR_CONN_LOST
2429                self._in_packet['to_process'] -= len(data)
2430                self._in_packet['packet'] += data
2431            count -= 1
2432            if count == 0:
2433                with self._msgtime_mutex:
2434                    self._last_msg_in = time_func()
2435                return MQTT_ERR_AGAIN
2436
2437        # All data for this packet is read.
2438        self._in_packet['pos'] = 0
2439        rc = self._packet_handle()
2440
2441        # Free data and reset values
2442        self._in_packet = {
2443            'command': 0,
2444            'have_remaining': 0,
2445            'remaining_count': [],
2446            'remaining_mult': 1,
2447            'remaining_length': 0,
2448            'packet': bytearray(b""),
2449            'to_process': 0,
2450            'pos': 0}
2451
2452        with self._msgtime_mutex:
2453            self._last_msg_in = time_func()
2454        return rc
2455
2456    def _packet_write(self):
2457        while True:
2458            try:
2459                packet = self._out_packet.popleft()
2460            except IndexError:
2461                return MQTT_ERR_SUCCESS
2462
2463            try:
2464                write_length = self._sock_send(
2465                    packet['packet'][packet['pos']:])
2466            except (AttributeError, ValueError):
2467                self._out_packet.appendleft(packet)
2468                return MQTT_ERR_SUCCESS
2469            except BlockingIOError:
2470                self._out_packet.appendleft(packet)
2471                return MQTT_ERR_AGAIN
2472            except ConnectionError as err:
2473                self._out_packet.appendleft(packet)
2474                self._easy_log(
2475                    MQTT_LOG_ERR, 'failed to receive on socket: %s', err)
2476                return MQTT_ERR_CONN_LOST
2477
2478            if write_length > 0:
2479                packet['to_process'] -= write_length
2480                packet['pos'] += write_length
2481
2482                if packet['to_process'] == 0:
2483                    if (packet['command'] & 0xF0) == PUBLISH and packet['qos'] == 0:
2484                        with self._callback_mutex:
2485                            on_publish = self.on_publish
2486
2487                        if on_publish:
2488                            with self._in_callback_mutex:
2489                                try:
2490                                    on_publish(
2491                                        self, self._userdata, packet['mid'])
2492                                except Exception as err:
2493                                    self._easy_log(
2494                                        MQTT_LOG_ERR, 'Caught exception in on_publish: %s', err)
2495                                    if not self.suppress_exceptions:
2496                                        raise
2497
2498                        packet['info']._set_as_published()
2499
2500                    if (packet['command'] & 0xF0) == DISCONNECT:
2501                        with self._msgtime_mutex:
2502                            self._last_msg_out = time_func()
2503
2504                        self._do_on_disconnect(MQTT_ERR_SUCCESS)
2505                        self._sock_close()
2506                        return MQTT_ERR_SUCCESS
2507
2508                else:
2509                    # We haven't finished with this packet
2510                    self._out_packet.appendleft(packet)
2511            else:
2512                break
2513
2514        with self._msgtime_mutex:
2515            self._last_msg_out = time_func()
2516
2517        return MQTT_ERR_SUCCESS
2518
2519    def _easy_log(self, level, fmt, *args):
2520        if self.on_log is not None:
2521            buf = fmt % args
2522            try:
2523                self.on_log(self, self._userdata, level, buf)
2524            except Exception:
2525                # Can't _easy_log this, as we'll recurse until we break
2526                pass  # self._logger will pick this up, so we're fine
2527        if self._logger is not None:
2528            level_std = LOGGING_LEVEL[level]
2529            self._logger.log(level_std, fmt, *args)
2530
2531    def _check_keepalive(self):
2532        if self._keepalive == 0:
2533            return MQTT_ERR_SUCCESS
2534
2535        now = time_func()
2536
2537        with self._msgtime_mutex:
2538            last_msg_out = self._last_msg_out
2539            last_msg_in = self._last_msg_in
2540
2541        if self._sock is not None and (now - last_msg_out >= self._keepalive or now - last_msg_in >= self._keepalive):
2542            if self._state == mqtt_cs_connected and self._ping_t == 0:
2543                try:
2544                    self._send_pingreq()
2545                except Exception:
2546                    self._sock_close()
2547                    self._do_on_disconnect(MQTT_ERR_CONN_LOST)
2548                else:
2549                    with self._msgtime_mutex:
2550                        self._last_msg_out = now
2551                        self._last_msg_in = now
2552            else:
2553                self._sock_close()
2554
2555                if self._state == mqtt_cs_disconnecting:
2556                    rc = MQTT_ERR_SUCCESS
2557                else:
2558                    rc = MQTT_ERR_KEEPALIVE
2559
2560                self._do_on_disconnect(rc)
2561
2562    def _mid_generate(self):
2563        with self._mid_generate_mutex:
2564            self._last_mid += 1
2565            if self._last_mid == 65536:
2566                self._last_mid = 1
2567            return self._last_mid
2568
2569    @staticmethod
2570    def _topic_wildcard_len_check(topic):
2571        # Search for + or # in a topic. Return MQTT_ERR_INVAL if found.
2572        # Also returns MQTT_ERR_INVAL if the topic string is too long.
2573        # Returns MQTT_ERR_SUCCESS if everything is fine.
2574        if b'+' in topic or b'#' in topic or len(topic) > 65535:
2575            return MQTT_ERR_INVAL
2576        else:
2577            return MQTT_ERR_SUCCESS
2578
2579    @staticmethod
2580    def _filter_wildcard_len_check(sub):
2581        if (len(sub) == 0 or len(sub) > 65535
2582            or any(b'+' in p or b'#' in p for p in sub.split(b'/') if len(p) > 1)
2583                or b'#/' in sub):
2584            return MQTT_ERR_INVAL
2585        else:
2586            return MQTT_ERR_SUCCESS
2587
2588    def _send_pingreq(self):
2589        self._easy_log(MQTT_LOG_DEBUG, "Sending PINGREQ")
2590        rc = self._send_simple_command(PINGREQ)
2591        if rc == MQTT_ERR_SUCCESS:
2592            self._ping_t = time_func()
2593        return rc
2594
2595    def _send_pingresp(self):
2596        self._easy_log(MQTT_LOG_DEBUG, "Sending PINGRESP")
2597        return self._send_simple_command(PINGRESP)
2598
2599    def _send_puback(self, mid):
2600        self._easy_log(MQTT_LOG_DEBUG, "Sending PUBACK (Mid: %d)", mid)
2601        return self._send_command_with_mid(PUBACK, mid, False)
2602
2603    def _send_pubcomp(self, mid):
2604        self._easy_log(MQTT_LOG_DEBUG, "Sending PUBCOMP (Mid: %d)", mid)
2605        return self._send_command_with_mid(PUBCOMP, mid, False)
2606
2607    def _pack_remaining_length(self, packet, remaining_length):
2608        remaining_bytes = []
2609        while True:
2610            byte = remaining_length % 128
2611            remaining_length = remaining_length // 128
2612            # If there are more digits to encode, set the top bit of this digit
2613            if remaining_length > 0:
2614                byte |= 0x80
2615
2616            remaining_bytes.append(byte)
2617            packet.append(byte)
2618            if remaining_length == 0:
2619                # FIXME - this doesn't deal with incorrectly large payloads
2620                return packet
2621
2622    def _pack_str16(self, packet, data):
2623        if isinstance(data, unicode):
2624            data = data.encode('utf-8')
2625        packet.extend(struct.pack("!H", len(data)))
2626        packet.extend(data)
2627
2628    def _send_publish(self, mid, topic, payload=b'', qos=0, retain=False, dup=False, info=None, properties=None):
2629        # we assume that topic and payload are already properly encoded
2630        assert not isinstance(topic, unicode) and not isinstance(
2631            payload, unicode) and payload is not None
2632
2633        if self._sock is None:
2634            return MQTT_ERR_NO_CONN
2635
2636        command = PUBLISH | ((dup & 0x1) << 3) | (qos << 1) | retain
2637        packet = bytearray()
2638        packet.append(command)
2639
2640        payloadlen = len(payload)
2641        remaining_length = 2 + len(topic) + payloadlen
2642
2643        if payloadlen == 0:
2644            if self._protocol == MQTTv5:
2645                self._easy_log(
2646                    MQTT_LOG_DEBUG,
2647                    "Sending PUBLISH (d%d, q%d, r%d, m%d), '%s', properties=%s (NULL payload)",
2648                    dup, qos, retain, mid, topic, properties
2649                )
2650            else:
2651                self._easy_log(
2652                    MQTT_LOG_DEBUG,
2653                    "Sending PUBLISH (d%d, q%d, r%d, m%d), '%s' (NULL payload)",
2654                    dup, qos, retain, mid, topic
2655                )
2656        else:
2657            if self._protocol == MQTTv5:
2658                self._easy_log(
2659                    MQTT_LOG_DEBUG,
2660                    "Sending PUBLISH (d%d, q%d, r%d, m%d), '%s', properties=%s, ... (%d bytes)",
2661                    dup, qos, retain, mid, topic, properties, payloadlen
2662                )
2663            else:
2664                self._easy_log(
2665                    MQTT_LOG_DEBUG,
2666                    "Sending PUBLISH (d%d, q%d, r%d, m%d), '%s', ... (%d bytes)",
2667                    dup, qos, retain, mid, topic, payloadlen
2668                )
2669
2670        if qos > 0:
2671            # For message id
2672            remaining_length += 2
2673
2674        if self._protocol == MQTTv5:
2675            if properties is None:
2676                packed_properties = b'\x00'
2677            else:
2678                packed_properties = properties.pack()
2679            remaining_length += len(packed_properties)
2680
2681        self._pack_remaining_length(packet, remaining_length)
2682        self._pack_str16(packet, topic)
2683
2684        if qos > 0:
2685            # For message id
2686            packet.extend(struct.pack("!H", mid))
2687
2688        if self._protocol == MQTTv5:
2689            packet.extend(packed_properties)
2690
2691        packet.extend(payload)
2692
2693        return self._packet_queue(PUBLISH, packet, mid, qos, info)
2694
2695    def _send_pubrec(self, mid):
2696        self._easy_log(MQTT_LOG_DEBUG, "Sending PUBREC (Mid: %d)", mid)
2697        return self._send_command_with_mid(PUBREC, mid, False)
2698
2699    def _send_pubrel(self, mid):
2700        self._easy_log(MQTT_LOG_DEBUG, "Sending PUBREL (Mid: %d)", mid)
2701        return self._send_command_with_mid(PUBREL | 2, mid, False)
2702
2703    def _send_command_with_mid(self, command, mid, dup):
2704        # For PUBACK, PUBCOMP, PUBREC, and PUBREL
2705        if dup:
2706            command |= 0x8
2707
2708        remaining_length = 2
2709        packet = struct.pack('!BBH', command, remaining_length, mid)
2710        return self._packet_queue(command, packet, mid, 1)
2711
2712    def _send_simple_command(self, command):
2713        # For DISCONNECT, PINGREQ and PINGRESP
2714        remaining_length = 0
2715        packet = struct.pack('!BB', command, remaining_length)
2716        return self._packet_queue(command, packet, 0, 0)
2717
2718    def _send_connect(self, keepalive):
2719        proto_ver = self._protocol
2720        # hard-coded UTF-8 encoded string
2721        protocol = b"MQTT" if proto_ver >= MQTTv311 else b"MQIsdp"
2722
2723        remaining_length = 2 + len(protocol) + 1 + \
2724            1 + 2 + 2 + len(self._client_id)
2725
2726        connect_flags = 0
2727        if self._protocol == MQTTv5:
2728            if self._clean_start == True:
2729                connect_flags |= 0x02
2730            elif self._clean_start == MQTT_CLEAN_START_FIRST_ONLY and self._mqttv5_first_connect:
2731                connect_flags |= 0x02
2732        elif self._clean_session:
2733            connect_flags |= 0x02
2734
2735        if self._will:
2736            remaining_length += 2 + \
2737                len(self._will_topic) + 2 + len(self._will_payload)
2738            connect_flags |= 0x04 | ((self._will_qos & 0x03) << 3) | (
2739                (self._will_retain & 0x01) << 5)
2740
2741        if self._username is not None:
2742            remaining_length += 2 + len(self._username)
2743            connect_flags |= 0x80
2744            if self._password is not None:
2745                connect_flags |= 0x40
2746                remaining_length += 2 + len(self._password)
2747
2748        if self._protocol == MQTTv5:
2749            if self._connect_properties is None:
2750                packed_connect_properties = b'\x00'
2751            else:
2752                packed_connect_properties = self._connect_properties.pack()
2753            remaining_length += len(packed_connect_properties)
2754            if self._will:
2755                if self._will_properties is None:
2756                    packed_will_properties = b'\x00'
2757                else:
2758                    packed_will_properties = self._will_properties.pack()
2759                remaining_length += len(packed_will_properties)
2760
2761        command = CONNECT
2762        packet = bytearray()
2763        packet.append(command)
2764
2765        # as per the mosquitto broker, if the MSB of this version is set
2766        # to 1, then it treats the connection as a bridge
2767        if self._client_mode == MQTT_BRIDGE:
2768            proto_ver |= 0x80
2769
2770        self._pack_remaining_length(packet, remaining_length)
2771        packet.extend(struct.pack("!H" + str(len(protocol)) + "sBBH", len(protocol), protocol, proto_ver, connect_flags,
2772                                  keepalive))
2773
2774        if self._protocol == MQTTv5:
2775            packet += packed_connect_properties
2776
2777        self._pack_str16(packet, self._client_id)
2778
2779        if self._will:
2780            if self._protocol == MQTTv5:
2781                packet += packed_will_properties
2782            self._pack_str16(packet, self._will_topic)
2783            self._pack_str16(packet, self._will_payload)
2784
2785        if self._username is not None:
2786            self._pack_str16(packet, self._username)
2787
2788            if self._password is not None:
2789                self._pack_str16(packet, self._password)
2790
2791        self._keepalive = keepalive
2792        if self._protocol == MQTTv5:
2793            self._easy_log(
2794                MQTT_LOG_DEBUG,
2795                "Sending CONNECT (u%d, p%d, wr%d, wq%d, wf%d, c%d, k%d) client_id=%s properties=%s",
2796                (connect_flags & 0x80) >> 7,
2797                (connect_flags & 0x40) >> 6,
2798                (connect_flags & 0x20) >> 5,
2799                (connect_flags & 0x18) >> 3,
2800                (connect_flags & 0x4) >> 2,
2801                (connect_flags & 0x2) >> 1,
2802                keepalive,
2803                self._client_id,
2804                self._connect_properties
2805            )
2806        else:
2807            self._easy_log(
2808                MQTT_LOG_DEBUG,
2809                "Sending CONNECT (u%d, p%d, wr%d, wq%d, wf%d, c%d, k%d) client_id=%s",
2810                (connect_flags & 0x80) >> 7,
2811                (connect_flags & 0x40) >> 6,
2812                (connect_flags & 0x20) >> 5,
2813                (connect_flags & 0x18) >> 3,
2814                (connect_flags & 0x4) >> 2,
2815                (connect_flags & 0x2) >> 1,
2816                keepalive,
2817                self._client_id
2818            )
2819        return self._packet_queue(command, packet, 0, 0)
2820
2821    def _send_disconnect(self, reasoncode=None, properties=None):
2822        if self._protocol == MQTTv5:
2823            self._easy_log(MQTT_LOG_DEBUG, "Sending DISCONNECT reasonCode=%s properties=%s",
2824                           reasoncode,
2825                           properties
2826                           )
2827        else:
2828            self._easy_log(MQTT_LOG_DEBUG, "Sending DISCONNECT")
2829
2830        remaining_length = 0
2831
2832        command = DISCONNECT
2833        packet = bytearray()
2834        packet.append(command)
2835
2836        if self._protocol == MQTTv5:
2837            if properties is not None or reasoncode is not None:
2838                if reasoncode is None:
2839                    reasoncode = ReasonCodes(DISCONNECT >> 4, identifier=0)
2840                remaining_length += 1
2841                if properties is not None:
2842                    packed_props = properties.pack()
2843                    remaining_length += len(packed_props)
2844
2845        self._pack_remaining_length(packet, remaining_length)
2846
2847        if self._protocol == MQTTv5:
2848            if reasoncode != None:
2849                packet += reasoncode.pack()
2850                if properties != None:
2851                    packet += packed_props
2852
2853        return self._packet_queue(command, packet, 0, 0)
2854
2855    def _send_subscribe(self, dup, topics, properties=None):
2856        remaining_length = 2
2857        if self._protocol == MQTTv5:
2858            if properties is None:
2859                packed_subscribe_properties = b'\x00'
2860            else:
2861                packed_subscribe_properties = properties.pack()
2862            remaining_length += len(packed_subscribe_properties)
2863        for t, _ in topics:
2864            remaining_length += 2 + len(t) + 1
2865
2866        command = SUBSCRIBE | (dup << 3) | 0x2
2867        packet = bytearray()
2868        packet.append(command)
2869        self._pack_remaining_length(packet, remaining_length)
2870        local_mid = self._mid_generate()
2871        packet.extend(struct.pack("!H", local_mid))
2872
2873        if self._protocol == MQTTv5:
2874            packet += packed_subscribe_properties
2875
2876        for t, q in topics:
2877            self._pack_str16(packet, t)
2878            if self._protocol == MQTTv5:
2879                packet += q.pack()
2880            else:
2881                packet.append(q)
2882
2883        self._easy_log(
2884            MQTT_LOG_DEBUG,
2885            "Sending SUBSCRIBE (d%d, m%d) %s",
2886            dup,
2887            local_mid,
2888            topics,
2889        )
2890        return (self._packet_queue(command, packet, local_mid, 1), local_mid)
2891
2892    def _send_unsubscribe(self, dup, topics, properties=None):
2893        remaining_length = 2
2894        if self._protocol == MQTTv5:
2895            if properties is None:
2896                packed_unsubscribe_properties = b'\x00'
2897            else:
2898                packed_unsubscribe_properties = properties.pack()
2899            remaining_length += len(packed_unsubscribe_properties)
2900        for t in topics:
2901            remaining_length += 2 + len(t)
2902
2903        command = UNSUBSCRIBE | (dup << 3) | 0x2
2904        packet = bytearray()
2905        packet.append(command)
2906        self._pack_remaining_length(packet, remaining_length)
2907        local_mid = self._mid_generate()
2908        packet.extend(struct.pack("!H", local_mid))
2909
2910        if self._protocol == MQTTv5:
2911            packet += packed_unsubscribe_properties
2912
2913        for t in topics:
2914            self._pack_str16(packet, t)
2915
2916        # topics_repr = ", ".join("'"+topic.decode('utf8')+"'" for topic in topics)
2917        if self._protocol == MQTTv5:
2918            self._easy_log(
2919                MQTT_LOG_DEBUG,
2920                "Sending UNSUBSCRIBE (d%d, m%d) %s %s",
2921                dup,
2922                local_mid,
2923                properties,
2924                topics,
2925            )
2926        else:
2927            self._easy_log(
2928                MQTT_LOG_DEBUG,
2929                "Sending UNSUBSCRIBE (d%d, m%d) %s",
2930                dup,
2931                local_mid,
2932                topics,
2933            )
2934        return (self._packet_queue(command, packet, local_mid, 1), local_mid)
2935
2936    def _check_clean_session(self):
2937        if self._protocol == MQTTv5:
2938            if self._clean_start == MQTT_CLEAN_START_FIRST_ONLY:
2939                return self._mqttv5_first_connect
2940            else:
2941                return self._clean_start
2942        else:
2943            return self._clean_session
2944
2945    def _messages_reconnect_reset_out(self):
2946        with self._out_message_mutex:
2947            self._inflight_messages = 0
2948            for m in self._out_messages.values():
2949                m.timestamp = 0
2950                if self._max_inflight_messages == 0 or self._inflight_messages < self._max_inflight_messages:
2951                    if m.qos == 0:
2952                        m.state = mqtt_ms_publish
2953                    elif m.qos == 1:
2954                        # self._inflight_messages = self._inflight_messages + 1
2955                        if m.state == mqtt_ms_wait_for_puback:
2956                            m.dup = True
2957                        m.state = mqtt_ms_publish
2958                    elif m.qos == 2:
2959                        # self._inflight_messages = self._inflight_messages + 1
2960                        if self._check_clean_session():
2961                            if m.state != mqtt_ms_publish:
2962                                m.dup = True
2963                            m.state = mqtt_ms_publish
2964                        else:
2965                            if m.state == mqtt_ms_wait_for_pubcomp:
2966                                m.state = mqtt_ms_resend_pubrel
2967                            else:
2968                                if m.state == mqtt_ms_wait_for_pubrec:
2969                                    m.dup = True
2970                                m.state = mqtt_ms_publish
2971                else:
2972                    m.state = mqtt_ms_queued
2973
2974    def _messages_reconnect_reset_in(self):
2975        with self._in_message_mutex:
2976            if self._check_clean_session():
2977                self._in_messages = collections.OrderedDict()
2978                return
2979            for m in self._in_messages.values():
2980                m.timestamp = 0
2981                if m.qos != 2:
2982                    self._in_messages.pop(m.mid)
2983                else:
2984                    # Preserve current state
2985                    pass
2986
2987    def _messages_reconnect_reset(self):
2988        self._messages_reconnect_reset_out()
2989        self._messages_reconnect_reset_in()
2990
2991    def _packet_queue(self, command, packet, mid, qos, info=None):
2992        mpkt = {
2993            'command': command,
2994            'mid': mid,
2995            'qos': qos,
2996            'pos': 0,
2997            'to_process': len(packet),
2998            'packet': packet,
2999            'info': info}
3000
3001        self._out_packet.append(mpkt)
3002
3003        # Write a single byte to sockpairW (connected to sockpairR) to break
3004        # out of select() if in threaded mode.
3005        if self._sockpairW is not None:
3006            try:
3007                self._sockpairW.send(sockpair_data)
3008            except BlockingIOError:
3009                pass
3010
3011        # If we have an external event loop registered, use that instead
3012        # of calling loop_write() directly.
3013        if self._thread is None and self._on_socket_register_write is None:
3014            if self._in_callback_mutex.acquire(False):
3015                self._in_callback_mutex.release()
3016                return self.loop_write()
3017
3018        self._call_socket_register_write()
3019
3020        return MQTT_ERR_SUCCESS
3021
3022    def _packet_handle(self):
3023        cmd = self._in_packet['command'] & 0xF0
3024        if cmd == PINGREQ:
3025            return self._handle_pingreq()
3026        elif cmd == PINGRESP:
3027            return self._handle_pingresp()
3028        elif cmd == PUBACK:
3029            return self._handle_pubackcomp("PUBACK")
3030        elif cmd == PUBCOMP:
3031            return self._handle_pubackcomp("PUBCOMP")
3032        elif cmd == PUBLISH:
3033            return self._handle_publish()
3034        elif cmd == PUBREC:
3035            return self._handle_pubrec()
3036        elif cmd == PUBREL:
3037            return self._handle_pubrel()
3038        elif cmd == CONNACK:
3039            return self._handle_connack()
3040        elif cmd == SUBACK:
3041            return self._handle_suback()
3042        elif cmd == UNSUBACK:
3043            return self._handle_unsuback()
3044        elif cmd == DISCONNECT and self._protocol == MQTTv5:  # only allowed in MQTT 5.0
3045            return self._handle_disconnect()
3046        else:
3047            # If we don't recognise the command, return an error straight away.
3048            self._easy_log(MQTT_LOG_ERR, "Error: Unrecognised command %s", cmd)
3049            return MQTT_ERR_PROTOCOL
3050
3051    def _handle_pingreq(self):
3052        if self._in_packet['remaining_length'] != 0:
3053            return MQTT_ERR_PROTOCOL
3054
3055        self._easy_log(MQTT_LOG_DEBUG, "Received PINGREQ")
3056        return self._send_pingresp()
3057
3058    def _handle_pingresp(self):
3059        if self._in_packet['remaining_length'] != 0:
3060            return MQTT_ERR_PROTOCOL
3061
3062        # No longer waiting for a PINGRESP.
3063        self._ping_t = 0
3064        self._easy_log(MQTT_LOG_DEBUG, "Received PINGRESP")
3065        return MQTT_ERR_SUCCESS
3066
3067    def _handle_connack(self):
3068        if self._protocol == MQTTv5:
3069            if self._in_packet['remaining_length'] < 2:
3070                return MQTT_ERR_PROTOCOL
3071        elif self._in_packet['remaining_length'] != 2:
3072            return MQTT_ERR_PROTOCOL
3073
3074        if self._protocol == MQTTv5:
3075            (flags, result) = struct.unpack(
3076                "!BB", self._in_packet['packet'][:2])
3077            if result == 1:
3078                # This is probably a failure from a broker that doesn't support
3079                # MQTT v5.
3080                reason = 132 # Unsupported protocol version
3081                properties = None
3082            else:
3083                reason = ReasonCodes(CONNACK >> 4, identifier=result)
3084                properties = Properties(CONNACK >> 4)
3085                properties.unpack(self._in_packet['packet'][2:])
3086        else:
3087            (flags, result) = struct.unpack("!BB", self._in_packet['packet'])
3088        if self._protocol == MQTTv311:
3089            if result == CONNACK_REFUSED_PROTOCOL_VERSION:
3090                if not self._reconnect_on_failure:
3091                    return MQTT_ERR_PROTOCOL
3092                self._easy_log(
3093                    MQTT_LOG_DEBUG,
3094                    "Received CONNACK (%s, %s), attempting downgrade to MQTT v3.1.",
3095                    flags, result
3096                )
3097                # Downgrade to MQTT v3.1
3098                self._protocol = MQTTv31
3099                return self.reconnect()
3100            elif (result == CONNACK_REFUSED_IDENTIFIER_REJECTED
3101                    and self._client_id == b''):
3102                if not self._reconnect_on_failure:
3103                    return MQTT_ERR_PROTOCOL
3104                self._easy_log(
3105                    MQTT_LOG_DEBUG,
3106                    "Received CONNACK (%s, %s), attempting to use non-empty CID",
3107                    flags, result,
3108                )
3109                self._client_id = base62(uuid.uuid4().int, padding=22)
3110                return self.reconnect()
3111
3112        if result == 0:
3113            self._state = mqtt_cs_connected
3114            self._reconnect_delay = None
3115
3116        if self._protocol == MQTTv5:
3117            self._easy_log(
3118                MQTT_LOG_DEBUG, "Received CONNACK (%s, %s) properties=%s", flags, reason, properties)
3119        else:
3120            self._easy_log(
3121                MQTT_LOG_DEBUG, "Received CONNACK (%s, %s)", flags, result)
3122
3123        # it won't be the first successful connect any more
3124        self._mqttv5_first_connect = False
3125
3126        with self._callback_mutex:
3127            on_connect = self.on_connect
3128
3129        if on_connect:
3130            flags_dict = {}
3131            flags_dict['session present'] = flags & 0x01
3132            with self._in_callback_mutex:
3133                try:
3134                    if self._protocol == MQTTv5:
3135                        on_connect(self, self._userdata,
3136                                        flags_dict, reason, properties)
3137                    else:
3138                        on_connect(
3139                            self, self._userdata, flags_dict, result)
3140                except Exception as err:
3141                    self._easy_log(
3142                        MQTT_LOG_ERR, 'Caught exception in on_connect: %s', err)
3143                    if not self.suppress_exceptions:
3144                        raise
3145
3146        if result == 0:
3147            rc = 0
3148            with self._out_message_mutex:
3149                for m in self._out_messages.values():
3150                    m.timestamp = time_func()
3151                    if m.state == mqtt_ms_queued:
3152                        self.loop_write()  # Process outgoing messages that have just been queued up
3153                        return MQTT_ERR_SUCCESS
3154
3155                    if m.qos == 0:
3156                        with self._in_callback_mutex:  # Don't call loop_write after _send_publish()
3157                            rc = self._send_publish(
3158                                m.mid,
3159                                m.topic.encode('utf-8'),
3160                                m.payload,
3161                                m.qos,
3162                                m.retain,
3163                                m.dup,
3164                                properties=m.properties
3165                            )
3166                        if rc != 0:
3167                            return rc
3168                    elif m.qos == 1:
3169                        if m.state == mqtt_ms_publish:
3170                            self._inflight_messages += 1
3171                            m.state = mqtt_ms_wait_for_puback
3172                            with self._in_callback_mutex:  # Don't call loop_write after _send_publish()
3173                                rc = self._send_publish(
3174                                    m.mid,
3175                                    m.topic.encode('utf-8'),
3176                                    m.payload,
3177                                    m.qos,
3178                                    m.retain,
3179                                    m.dup,
3180                                    properties=m.properties
3181                                )
3182                            if rc != 0:
3183                                return rc
3184                    elif m.qos == 2:
3185                        if m.state == mqtt_ms_publish:
3186                            self._inflight_messages += 1
3187                            m.state = mqtt_ms_wait_for_pubrec
3188                            with self._in_callback_mutex:  # Don't call loop_write after _send_publish()
3189                                rc = self._send_publish(
3190                                    m.mid,
3191                                    m.topic.encode('utf-8'),
3192                                    m.payload,
3193                                    m.qos,
3194                                    m.retain,
3195                                    m.dup,
3196                                    properties=m.properties
3197                                )
3198                            if rc != 0:
3199                                return rc
3200                        elif m.state == mqtt_ms_resend_pubrel:
3201                            self._inflight_messages += 1
3202                            m.state = mqtt_ms_wait_for_pubcomp
3203                            with self._in_callback_mutex:  # Don't call loop_write after _send_publish()
3204                                rc = self._send_pubrel(m.mid)
3205                            if rc != 0:
3206                                return rc
3207                    self.loop_write()  # Process outgoing messages that have just been queued up
3208
3209            return rc
3210        elif result > 0 and result < 6:
3211            return MQTT_ERR_CONN_REFUSED
3212        else:
3213            return MQTT_ERR_PROTOCOL
3214
3215    def _handle_disconnect(self):
3216        packet_type = DISCONNECT >> 4
3217        reasonCode = properties = None
3218        if self._in_packet['remaining_length'] > 2:
3219            reasonCode = ReasonCodes(packet_type)
3220            reasonCode.unpack(self._in_packet['packet'])
3221            if self._in_packet['remaining_length'] > 3:
3222                properties = Properties(packet_type)
3223                props, props_len = properties.unpack(
3224                    self._in_packet['packet'][1:])
3225        self._easy_log(MQTT_LOG_DEBUG, "Received DISCONNECT %s %s",
3226                       reasonCode,
3227                       properties
3228                       )
3229
3230        self._loop_rc_handle(reasonCode, properties)
3231
3232        return MQTT_ERR_SUCCESS
3233
3234    def _handle_suback(self):
3235        self._easy_log(MQTT_LOG_DEBUG, "Received SUBACK")
3236        pack_format = "!H" + str(len(self._in_packet['packet']) - 2) + 's'
3237        (mid, packet) = struct.unpack(pack_format, self._in_packet['packet'])
3238
3239        if self._protocol == MQTTv5:
3240            properties = Properties(SUBACK >> 4)
3241            props, props_len = properties.unpack(packet)
3242            reasoncodes = []
3243            for c in packet[props_len:]:
3244                if sys.version_info[0] < 3:
3245                    c = ord(c)
3246                reasoncodes.append(ReasonCodes(SUBACK >> 4, identifier=c))
3247        else:
3248            pack_format = "!" + "B" * len(packet)
3249            granted_qos = struct.unpack(pack_format, packet)
3250
3251        with self._callback_mutex:
3252            on_subscribe = self.on_subscribe
3253
3254        if on_subscribe:
3255            with self._in_callback_mutex:  # Don't call loop_write after _send_publish()
3256                try:
3257                    if self._protocol == MQTTv5:
3258                        on_subscribe(
3259                            self, self._userdata, mid, reasoncodes, properties)
3260                    else:
3261                        on_subscribe(
3262                            self, self._userdata, mid, granted_qos)
3263                except Exception as err:
3264                    self._easy_log(
3265                        MQTT_LOG_ERR, 'Caught exception in on_subscribe: %s', err)
3266                    if not self.suppress_exceptions:
3267                        raise
3268
3269        return MQTT_ERR_SUCCESS
3270
3271    def _handle_publish(self):
3272        rc = 0
3273
3274        header = self._in_packet['command']
3275        message = MQTTMessage()
3276        message.dup = (header & 0x08) >> 3
3277        message.qos = (header & 0x06) >> 1
3278        message.retain = (header & 0x01)
3279
3280        pack_format = "!H" + str(len(self._in_packet['packet']) - 2) + 's'
3281        (slen, packet) = struct.unpack(pack_format, self._in_packet['packet'])
3282        pack_format = '!' + str(slen) + 's' + str(len(packet) - slen) + 's'
3283        (topic, packet) = struct.unpack(pack_format, packet)
3284
3285        if self._protocol != MQTTv5 and len(topic) == 0:
3286            return MQTT_ERR_PROTOCOL
3287
3288        # Handle topics with invalid UTF-8
3289        # This replaces an invalid topic with a message and the hex
3290        # representation of the topic for logging. When the user attempts to
3291        # access message.topic in the callback, an exception will be raised.
3292        try:
3293            print_topic = topic.decode('utf-8')
3294        except UnicodeDecodeError:
3295            print_topic = "TOPIC WITH INVALID UTF-8: " + str(topic)
3296
3297        message.topic = topic
3298
3299        if message.qos > 0:
3300            pack_format = "!H" + str(len(packet) - 2) + 's'
3301            (message.mid, packet) = struct.unpack(pack_format, packet)
3302
3303        if self._protocol == MQTTv5:
3304            message.properties = Properties(PUBLISH >> 4)
3305            props, props_len = message.properties.unpack(packet)
3306            packet = packet[props_len:]
3307
3308        message.payload = packet
3309
3310        if self._protocol == MQTTv5:
3311            self._easy_log(
3312                MQTT_LOG_DEBUG,
3313                "Received PUBLISH (d%d, q%d, r%d, m%d), '%s', properties=%s, ...  (%d bytes)",
3314                message.dup, message.qos, message.retain, message.mid,
3315                print_topic, message.properties, len(message.payload)
3316            )
3317        else:
3318            self._easy_log(
3319                MQTT_LOG_DEBUG,
3320                "Received PUBLISH (d%d, q%d, r%d, m%d), '%s', ...  (%d bytes)",
3321                message.dup, message.qos, message.retain, message.mid,
3322                print_topic, len(message.payload)
3323            )
3324
3325        message.timestamp = time_func()
3326        if message.qos == 0:
3327            self._handle_on_message(message)
3328            return MQTT_ERR_SUCCESS
3329        elif message.qos == 1:
3330            self._handle_on_message(message)
3331            return self._send_puback(message.mid)
3332        elif message.qos == 2:
3333            rc = self._send_pubrec(message.mid)
3334            message.state = mqtt_ms_wait_for_pubrel
3335            with self._in_message_mutex:
3336                self._in_messages[message.mid] = message
3337            return rc
3338        else:
3339            return MQTT_ERR_PROTOCOL
3340
3341    def _handle_pubrel(self):
3342        if self._protocol == MQTTv5:
3343            if self._in_packet['remaining_length'] < 2:
3344                return MQTT_ERR_PROTOCOL
3345        elif self._in_packet['remaining_length'] != 2:
3346            return MQTT_ERR_PROTOCOL
3347
3348        mid, = struct.unpack("!H", self._in_packet['packet'])
3349        self._easy_log(MQTT_LOG_DEBUG, "Received PUBREL (Mid: %d)", mid)
3350
3351        with self._in_message_mutex:
3352            if mid in self._in_messages:
3353                # Only pass the message on if we have removed it from the queue - this
3354                # prevents multiple callbacks for the same message.
3355                message = self._in_messages.pop(mid)
3356                self._handle_on_message(message)
3357                self._inflight_messages -= 1
3358                if self._max_inflight_messages > 0:
3359                    with self._out_message_mutex:
3360                        rc = self._update_inflight()
3361                    if rc != MQTT_ERR_SUCCESS:
3362                        return rc
3363
3364        # FIXME: this should only be done if the message is known
3365        # If unknown it's a protocol error and we should close the connection.
3366        # But since we don't have (on disk) persistence for the session, it
3367        # is possible that we must known about this message.
3368        # Choose to acknwoledge this messsage (and thus losing a message) but
3369        # avoid hanging. See #284.
3370        return self._send_pubcomp(mid)
3371
3372    def _update_inflight(self):
3373        # Dont lock message_mutex here
3374        for m in self._out_messages.values():
3375            if self._inflight_messages < self._max_inflight_messages:
3376                if m.qos > 0 and m.state == mqtt_ms_queued:
3377                    self._inflight_messages += 1
3378                    if m.qos == 1:
3379                        m.state = mqtt_ms_wait_for_puback
3380                    elif m.qos == 2:
3381                        m.state = mqtt_ms_wait_for_pubrec
3382                    rc = self._send_publish(
3383                        m.mid,
3384                        m.topic.encode('utf-8'),
3385                        m.payload,
3386                        m.qos,
3387                        m.retain,
3388                        m.dup,
3389                        properties=m.properties,
3390                    )
3391                    if rc != 0:
3392                        return rc
3393            else:
3394                return MQTT_ERR_SUCCESS
3395        return MQTT_ERR_SUCCESS
3396
3397    def _handle_pubrec(self):
3398        if self._protocol == MQTTv5:
3399            if self._in_packet['remaining_length'] < 2:
3400                return MQTT_ERR_PROTOCOL
3401        elif self._in_packet['remaining_length'] != 2:
3402            return MQTT_ERR_PROTOCOL
3403
3404        mid, = struct.unpack("!H", self._in_packet['packet'][:2])
3405        if self._protocol == MQTTv5:
3406            if self._in_packet['remaining_length'] > 2:
3407                reasonCode = ReasonCodes(PUBREC >> 4)
3408                reasonCode.unpack(self._in_packet['packet'][2:])
3409                if self._in_packet['remaining_length'] > 3:
3410                    properties = Properties(PUBREC >> 4)
3411                    props, props_len = properties.unpack(
3412                        self._in_packet['packet'][3:])
3413        self._easy_log(MQTT_LOG_DEBUG, "Received PUBREC (Mid: %d)", mid)
3414
3415        with self._out_message_mutex:
3416            if mid in self._out_messages:
3417                msg = self._out_messages[mid]
3418                msg.state = mqtt_ms_wait_for_pubcomp
3419                msg.timestamp = time_func()
3420                return self._send_pubrel(mid)
3421
3422        return MQTT_ERR_SUCCESS
3423
3424    def _handle_unsuback(self):
3425        if self._protocol == MQTTv5:
3426            if self._in_packet['remaining_length'] < 4:
3427                return MQTT_ERR_PROTOCOL
3428        elif self._in_packet['remaining_length'] != 2:
3429            return MQTT_ERR_PROTOCOL
3430
3431        mid, = struct.unpack("!H", self._in_packet['packet'][:2])
3432        if self._protocol == MQTTv5:
3433            packet = self._in_packet['packet'][2:]
3434            properties = Properties(UNSUBACK >> 4)
3435            props, props_len = properties.unpack(packet)
3436            reasoncodes = []
3437            for c in packet[props_len:]:
3438                if sys.version_info[0] < 3:
3439                    c = ord(c)
3440                reasoncodes.append(ReasonCodes(UNSUBACK >> 4, identifier=c))
3441            if len(reasoncodes) == 1:
3442                reasoncodes = reasoncodes[0]
3443
3444        self._easy_log(MQTT_LOG_DEBUG, "Received UNSUBACK (Mid: %d)", mid)
3445        with self._callback_mutex:
3446            on_unsubscribe = self.on_unsubscribe
3447
3448        if on_unsubscribe:
3449            with self._in_callback_mutex:
3450                try:
3451                    if self._protocol == MQTTv5:
3452                        on_unsubscribe(
3453                            self, self._userdata, mid, properties, reasoncodes)
3454                    else:
3455                        on_unsubscribe(self, self._userdata, mid)
3456                except Exception as err:
3457                    self._easy_log(
3458                        MQTT_LOG_ERR, 'Caught exception in on_unsubscribe: %s', err)
3459                    if not self.suppress_exceptions:
3460                        raise
3461
3462        return MQTT_ERR_SUCCESS
3463
3464    def _do_on_disconnect(self, rc, properties=None):
3465        with self._callback_mutex:
3466            on_disconnect = self.on_disconnect
3467
3468        if on_disconnect:
3469            with self._in_callback_mutex:
3470                try:
3471                    if self._protocol == MQTTv5:
3472                        on_disconnect(
3473                            self, self._userdata, rc, properties)
3474                    else:
3475                        on_disconnect(self, self._userdata, rc)
3476                except Exception as err:
3477                    self._easy_log(
3478                        MQTT_LOG_ERR, 'Caught exception in on_disconnect: %s', err)
3479                    if not self.suppress_exceptions:
3480                        raise
3481
3482    def _do_on_publish(self, mid):
3483        with self._callback_mutex:
3484            on_publish = self.on_publish
3485
3486        if on_publish:
3487            with self._in_callback_mutex:
3488                try:
3489                    on_publish(self, self._userdata, mid)
3490                except Exception as err:
3491                    self._easy_log(
3492                        MQTT_LOG_ERR, 'Caught exception in on_publish: %s', err)
3493                    if not self.suppress_exceptions:
3494                        raise
3495
3496        msg = self._out_messages.pop(mid)
3497        msg.info._set_as_published()
3498        if msg.qos > 0:
3499            self._inflight_messages -= 1
3500            if self._max_inflight_messages > 0:
3501                rc = self._update_inflight()
3502                if rc != MQTT_ERR_SUCCESS:
3503                    return rc
3504        return MQTT_ERR_SUCCESS
3505
3506    def _handle_pubackcomp(self, cmd):
3507        if self._protocol == MQTTv5:
3508            if self._in_packet['remaining_length'] < 2:
3509                return MQTT_ERR_PROTOCOL
3510        elif self._in_packet['remaining_length'] != 2:
3511            return MQTT_ERR_PROTOCOL
3512
3513        packet_type = PUBACK if cmd == "PUBACK" else PUBCOMP
3514        packet_type = packet_type >> 4
3515        mid, = struct.unpack("!H", self._in_packet['packet'][:2])
3516        if self._protocol == MQTTv5:
3517            if self._in_packet['remaining_length'] > 2:
3518                reasonCode = ReasonCodes(packet_type)
3519                reasonCode.unpack(self._in_packet['packet'][2:])
3520                if self._in_packet['remaining_length'] > 3:
3521                    properties = Properties(packet_type)
3522                    props, props_len = properties.unpack(
3523                        self._in_packet['packet'][3:])
3524        self._easy_log(MQTT_LOG_DEBUG, "Received %s (Mid: %d)", cmd, mid)
3525
3526        with self._out_message_mutex:
3527            if mid in self._out_messages:
3528                # Only inform the client the message has been sent once.
3529                rc = self._do_on_publish(mid)
3530                return rc
3531
3532        return MQTT_ERR_SUCCESS
3533
3534    def _handle_on_message(self, message):
3535        matched = False
3536
3537        try:
3538            topic = message.topic
3539        except UnicodeDecodeError:
3540            topic = None
3541
3542        on_message_callbacks = []
3543        with self._callback_mutex:
3544            if topic is not None:
3545                for callback in self._on_message_filtered.iter_match(message.topic):
3546                    on_message_callbacks.append(callback)
3547
3548            if len(on_message_callbacks) == 0:
3549                on_message = self.on_message
3550            else:
3551                on_message = None
3552
3553        for callback in on_message_callbacks:
3554            with self._in_callback_mutex:
3555                try:
3556                    callback(self, self._userdata, message)
3557                except Exception as err:
3558                    self._easy_log(
3559                        MQTT_LOG_ERR,
3560                        'Caught exception in user defined callback function %s: %s',
3561                        callback.__name__,
3562                        err
3563                    )
3564                    if not self.suppress_exceptions:
3565                        raise
3566
3567        if on_message:
3568            with self._in_callback_mutex:
3569                try:
3570                    on_message(self, self._userdata, message)
3571                except Exception as err:
3572                    self._easy_log(
3573                        MQTT_LOG_ERR, 'Caught exception in on_message: %s', err)
3574                    if not self.suppress_exceptions:
3575                        raise
3576
3577
3578    def _handle_on_connect_fail(self):
3579        with self._callback_mutex:
3580            on_connect_fail = self.on_connect_fail
3581
3582        if on_connect_fail:
3583            with self._in_callback_mutex:
3584                try:
3585                    on_connect_fail(self, self._userdata)
3586                except Exception as err:
3587                    self._easy_log(
3588                        MQTT_LOG_ERR, 'Caught exception in on_connect_fail: %s', err)
3589
3590    def _thread_main(self):
3591        self.loop_forever(retry_first_connection=True)
3592
3593    def _reconnect_wait(self):
3594        # See reconnect_delay_set for details
3595        now = time_func()
3596        with self._reconnect_delay_mutex:
3597            if self._reconnect_delay is None:
3598                self._reconnect_delay = self._reconnect_min_delay
3599            else:
3600                self._reconnect_delay = min(
3601                    self._reconnect_delay * 2,
3602                    self._reconnect_max_delay,
3603                )
3604
3605            target_time = now + self._reconnect_delay
3606
3607        remaining = target_time - now
3608        while (self._state != mqtt_cs_disconnecting
3609                and not self._thread_terminate
3610                and remaining > 0):
3611
3612            time.sleep(min(remaining, 1))
3613            remaining = target_time - time_func()
3614
3615    @staticmethod
3616    def _proxy_is_valid(p):
3617        def check(t, a):
3618            return (socks is not None and
3619                    t in set([socks.HTTP, socks.SOCKS4, socks.SOCKS5]) and a)
3620
3621        if isinstance(p, dict):
3622            return check(p.get("proxy_type"), p.get("proxy_addr"))
3623        elif isinstance(p, (list, tuple)):
3624            return len(p) == 6 and check(p[0], p[1])
3625        else:
3626            return False
3627
3628    def _get_proxy(self):
3629        if socks is None:
3630            return None
3631
3632        # First, check if the user explicitly passed us a proxy to use
3633        if self._proxy_is_valid(self._proxy):
3634            return self._proxy
3635
3636        # Next, check for an mqtt_proxy environment variable as long as the host
3637        # we're trying to connect to isn't listed under the no_proxy environment
3638        # variable (matches built-in module urllib's behavior)
3639        if not (hasattr(urllib_dot_request, "proxy_bypass") and
3640                urllib_dot_request.proxy_bypass(self._host)):
3641            env_proxies = urllib_dot_request.getproxies()
3642            if "mqtt" in env_proxies:
3643                parts = urllib_dot_parse.urlparse(env_proxies["mqtt"])
3644                if parts.scheme == "http":
3645                    proxy = {
3646                        "proxy_type": socks.HTTP,
3647                        "proxy_addr": parts.hostname,
3648                        "proxy_port": parts.port
3649                    }
3650                    return proxy
3651                elif parts.scheme == "socks":
3652                    proxy = {
3653                        "proxy_type": socks.SOCKS5,
3654                        "proxy_addr": parts.hostname,
3655                        "proxy_port": parts.port
3656                    }
3657                    return proxy
3658
3659        # Finally, check if the user has monkeypatched the PySocks library with
3660        # a default proxy
3661        socks_default = socks.get_default_proxy()
3662        if self._proxy_is_valid(socks_default):
3663            proxy_keys = ("proxy_type", "proxy_addr", "proxy_port",
3664                          "proxy_rdns", "proxy_username", "proxy_password")
3665            return dict(zip(proxy_keys, socks_default))
3666
3667        # If we didn't find a proxy through any of the above methods, return
3668        # None to indicate that the connection should be handled normally
3669        return None
3670
3671    def _create_socket_connection(self):
3672        proxy = self._get_proxy()
3673        addr = (self._host, self._port)
3674        source = (self._bind_address, self._bind_port)
3675
3676
3677        if sys.version_info < (2, 7) or (3, 0) < sys.version_info < (3, 2):
3678            # Have to short-circuit here because of unsupported source_address
3679            # param in earlier Python versions.
3680            return socket.create_connection(addr, timeout=self._connect_timeout)
3681
3682        if proxy:
3683            return socks.create_connection(addr, timeout=self._connect_timeout, source_address=source, **proxy)
3684        else:
3685            return socket.create_connection(addr, timeout=self._connect_timeout, source_address=source)
3686
3687
3688class WebsocketWrapper(object):
3689    OPCODE_CONTINUATION = 0x0
3690    OPCODE_TEXT = 0x1
3691    OPCODE_BINARY = 0x2
3692    OPCODE_CONNCLOSE = 0x8
3693    OPCODE_PING = 0x9
3694    OPCODE_PONG = 0xa
3695
3696    def __init__(self, socket, host, port, is_ssl, path, extra_headers):
3697
3698        self.connected = False
3699
3700        self._ssl = is_ssl
3701        self._host = host
3702        self._port = port
3703        self._socket = socket
3704        self._path = path
3705
3706        self._sendbuffer = bytearray()
3707        self._readbuffer = bytearray()
3708
3709        self._requested_size = 0
3710        self._payload_head = 0
3711        self._readbuffer_head = 0
3712
3713        self._do_handshake(extra_headers)
3714
3715    def __del__(self):
3716
3717        self._sendbuffer = None
3718        self._readbuffer = None
3719
3720    def _do_handshake(self, extra_headers):
3721
3722        sec_websocket_key = uuid.uuid4().bytes
3723        sec_websocket_key = base64.b64encode(sec_websocket_key)
3724
3725        websocket_headers = {
3726            "Host": "{self._host:s}:{self._port:d}".format(self=self),
3727            "Upgrade": "websocket",
3728            "Connection": "Upgrade",
3729            "Origin": "https://{self._host:s}:{self._port:d}".format(self=self),
3730            "Sec-WebSocket-Key": sec_websocket_key.decode("utf8"),
3731            "Sec-Websocket-Version": "13",
3732            "Sec-Websocket-Protocol": "mqtt",
3733        }
3734
3735        # This is checked in ws_set_options so it will either be None, a
3736        # dictionary, or a callable
3737        if isinstance(extra_headers, dict):
3738            websocket_headers.update(extra_headers)
3739        elif callable(extra_headers):
3740            websocket_headers = extra_headers(websocket_headers)
3741
3742        header = "\r\n".join([
3743            "GET {self._path} HTTP/1.1".format(self=self),
3744            "\r\n".join("{}: {}".format(i, j)
3745                        for i, j in websocket_headers.items()),
3746            "\r\n",
3747        ]).encode("utf8")
3748
3749        self._socket.send(header)
3750
3751        has_secret = False
3752        has_upgrade = False
3753
3754        while True:
3755            # read HTTP response header as lines
3756            byte = self._socket.recv(1)
3757
3758            self._readbuffer.extend(byte)
3759
3760            # line end
3761            if byte == b"\n":
3762                if len(self._readbuffer) > 2:
3763                    # check upgrade
3764                    if b"connection" in str(self._readbuffer).lower().encode('utf-8'):
3765                        if b"upgrade" not in str(self._readbuffer).lower().encode('utf-8'):
3766                            raise WebsocketConnectionError(
3767                                "WebSocket handshake error, connection not upgraded")
3768                        else:
3769                            has_upgrade = True
3770
3771                    # check key hash
3772                    if b"sec-websocket-accept" in str(self._readbuffer).lower().encode('utf-8'):
3773                        GUID = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11"
3774
3775                        server_hash = self._readbuffer.decode(
3776                            'utf-8').split(": ", 1)[1]
3777                        server_hash = server_hash.strip().encode('utf-8')
3778
3779                        client_hash = sec_websocket_key.decode('utf-8') + GUID
3780                        client_hash = hashlib.sha1(client_hash.encode('utf-8'))
3781                        client_hash = base64.b64encode(client_hash.digest())
3782
3783                        if server_hash != client_hash:
3784                            raise WebsocketConnectionError(
3785                                "WebSocket handshake error, invalid secret key")
3786                        else:
3787                            has_secret = True
3788                else:
3789                    # ending linebreak
3790                    break
3791
3792                # reset linebuffer
3793                self._readbuffer = bytearray()
3794
3795            # connection reset
3796            elif not byte:
3797                raise WebsocketConnectionError("WebSocket handshake error")
3798
3799        if not has_upgrade or not has_secret:
3800            raise WebsocketConnectionError("WebSocket handshake error")
3801
3802        self._readbuffer = bytearray()
3803        self.connected = True
3804
3805    def _create_frame(self, opcode, data, do_masking=1):
3806
3807        header = bytearray()
3808        length = len(data)
3809
3810        mask_key = bytearray(os.urandom(4))
3811        mask_flag = do_masking
3812
3813        # 1 << 7 is the final flag, we don't send continuated data
3814        header.append(1 << 7 | opcode)
3815
3816        if length < 126:
3817            header.append(mask_flag << 7 | length)
3818
3819        elif length < 65536:
3820            header.append(mask_flag << 7 | 126)
3821            header += struct.pack("!H", length)
3822
3823        elif length < 0x8000000000000001:
3824            header.append(mask_flag << 7 | 127)
3825            header += struct.pack("!Q", length)
3826
3827        else:
3828            raise ValueError("Maximum payload size is 2^63")
3829
3830        if mask_flag == 1:
3831            for index in range(length):
3832                data[index] ^= mask_key[index % 4]
3833            data = mask_key + data
3834
3835        return header + data
3836
3837    def _buffered_read(self, length):
3838
3839        # try to recv and store needed bytes
3840        wanted_bytes = length - (len(self._readbuffer) - self._readbuffer_head)
3841        if wanted_bytes > 0:
3842
3843            data = self._socket.recv(wanted_bytes)
3844
3845            if not data:
3846                raise ConnectionAbortedError
3847            else:
3848                self._readbuffer.extend(data)
3849
3850            if len(data) < wanted_bytes:
3851                raise BlockingIOError
3852
3853        self._readbuffer_head += length
3854        return self._readbuffer[self._readbuffer_head - length:self._readbuffer_head]
3855
3856    def _recv_impl(self, length):
3857
3858        # try to decode websocket payload part from data
3859        try:
3860
3861            self._readbuffer_head = 0
3862
3863            result = None
3864
3865            chunk_startindex = self._payload_head
3866            chunk_endindex = self._payload_head + length
3867
3868            header1 = self._buffered_read(1)
3869            header2 = self._buffered_read(1)
3870
3871            opcode = (header1[0] & 0x0f)
3872            maskbit = (header2[0] & 0x80) == 0x80
3873            lengthbits = (header2[0] & 0x7f)
3874            payload_length = lengthbits
3875            mask_key = None
3876
3877            # read length
3878            if lengthbits == 0x7e:
3879
3880                value = self._buffered_read(2)
3881                payload_length, = struct.unpack("!H", value)
3882
3883            elif lengthbits == 0x7f:
3884
3885                value = self._buffered_read(8)
3886                payload_length, = struct.unpack("!Q", value)
3887
3888            # read mask
3889            if maskbit:
3890                mask_key = self._buffered_read(4)
3891
3892            # if frame payload is shorter than the requested data, read only the possible part
3893            readindex = chunk_endindex
3894            if payload_length < readindex:
3895                readindex = payload_length
3896
3897            if readindex > 0:
3898                # get payload chunk
3899                payload = self._buffered_read(readindex)
3900
3901                # unmask only the needed part
3902                if maskbit:
3903                    for index in range(chunk_startindex, readindex):
3904                        payload[index] ^= mask_key[index % 4]
3905
3906                result = payload[chunk_startindex:readindex]
3907                self._payload_head = readindex
3908            else:
3909                payload = bytearray()
3910
3911            # check if full frame arrived and reset readbuffer and payloadhead if needed
3912            if readindex == payload_length:
3913                self._readbuffer = bytearray()
3914                self._payload_head = 0
3915
3916                # respond to non-binary opcodes, their arrival is not guaranteed beacause of non-blocking sockets
3917                if opcode == WebsocketWrapper.OPCODE_CONNCLOSE:
3918                    frame = self._create_frame(
3919                        WebsocketWrapper.OPCODE_CONNCLOSE, payload, 0)
3920                    self._socket.send(frame)
3921
3922                if opcode == WebsocketWrapper.OPCODE_PING:
3923                    frame = self._create_frame(
3924                        WebsocketWrapper.OPCODE_PONG, payload, 0)
3925                    self._socket.send(frame)
3926
3927            # This isn't *proper* handling of continuation frames, but given
3928            # that we only support binary frames, it is *probably* good enough.
3929            if (opcode == WebsocketWrapper.OPCODE_BINARY or opcode == WebsocketWrapper.OPCODE_CONTINUATION) \
3930                    and payload_length > 0:
3931                return result
3932            else:
3933                raise BlockingIOError
3934
3935        except ConnectionError:
3936            self.connected = False
3937            return b''
3938
3939    def _send_impl(self, data):
3940
3941        # if previous frame was sent successfully
3942        if len(self._sendbuffer) == 0:
3943            # create websocket frame
3944            frame = self._create_frame(
3945                WebsocketWrapper.OPCODE_BINARY, bytearray(data))
3946            self._sendbuffer.extend(frame)
3947            self._requested_size = len(data)
3948
3949        # try to write out as much as possible
3950        length = self._socket.send(self._sendbuffer)
3951
3952        self._sendbuffer = self._sendbuffer[length:]
3953
3954        if len(self._sendbuffer) == 0:
3955            # buffer sent out completely, return with payload's size
3956            return self._requested_size
3957        else:
3958            # couldn't send whole data, request the same data again with 0 as sent length
3959            return 0
3960
3961    def recv(self, length):
3962        return self._recv_impl(length)
3963
3964    def read(self, length):
3965        return self._recv_impl(length)
3966
3967    def send(self, data):
3968        return self._send_impl(data)
3969
3970    def write(self, data):
3971        return self._send_impl(data)
3972
3973    def close(self):
3974        self._socket.close()
3975
3976    def fileno(self):
3977        return self._socket.fileno()
3978
3979    def pending(self):
3980        # Fix for bug #131: a SSL socket may still have data available
3981        # for reading without select() being aware of it.
3982        if self._ssl:
3983            return self._socket.pending()
3984        else:
3985            # normal socket rely only on select()
3986            return 0
3987
3988    def setblocking(self, flag):
3989        self._socket.setblocking(flag)
3990