1# Copyright (c) 2012-2019 Roger Light and others 2# 3# All rights reserved. This program and the accompanying materials 4# are made available under the terms of the Eclipse Public License v2.0 5# and Eclipse Distribution License v1.0 which accompany this distribution. 6# 7# The Eclipse Public License is available at 8# http://www.eclipse.org/legal/epl-v10.html 9# and the Eclipse Distribution License is available at 10# http://www.eclipse.org/org/documents/edl-v10.php. 11# 12# Contributors: 13# Roger Light - initial API and implementation 14# Ian Craggs - MQTT V5 support 15 16import base64 17import hashlib 18import logging 19import string 20import struct 21import sys 22import threading 23import time 24import uuid 25 26from .matcher import MQTTMatcher 27from .properties import Properties 28from .reasoncodes import ReasonCodes 29from .subscribeoptions import SubscribeOptions 30 31""" 32This is an MQTT client module. MQTT is a lightweight pub/sub messaging 33protocol that is easy to implement and suitable for low powered devices. 34""" 35import collections 36import errno 37import os 38import platform 39import select 40import socket 41 42ssl = None 43try: 44 import ssl 45except ImportError: 46 pass 47 48socks = None 49try: 50 import socks 51except ImportError: 52 pass 53 54try: 55 # Python 3 56 from urllib import parse as urllib_dot_parse 57 from urllib import request as urllib_dot_request 58except ImportError: 59 # Python 2 60 import urllib as urllib_dot_request 61 62 import urlparse as urllib_dot_parse 63 64 65try: 66 # Use monotonic clock if available 67 time_func = time.monotonic 68except AttributeError: 69 time_func = time.time 70 71try: 72 import dns.resolver 73except ImportError: 74 HAVE_DNS = False 75else: 76 HAVE_DNS = True 77 78 79if platform.system() == 'Windows': 80 EAGAIN = errno.WSAEWOULDBLOCK 81else: 82 EAGAIN = errno.EAGAIN 83 84# Python 2.7 does not have BlockingIOError. Fall back to IOError 85try: 86 BlockingIOError 87except NameError: 88 BlockingIOError = IOError 89 90MQTTv31 = 3 91MQTTv311 = 4 92MQTTv5 = 5 93 94if sys.version_info[0] >= 3: 95 # define some alias for python2 compatibility 96 unicode = str 97 basestring = str 98 99# Message types 100CONNECT = 0x10 101CONNACK = 0x20 102PUBLISH = 0x30 103PUBACK = 0x40 104PUBREC = 0x50 105PUBREL = 0x60 106PUBCOMP = 0x70 107SUBSCRIBE = 0x80 108SUBACK = 0x90 109UNSUBSCRIBE = 0xA0 110UNSUBACK = 0xB0 111PINGREQ = 0xC0 112PINGRESP = 0xD0 113DISCONNECT = 0xE0 114AUTH = 0xF0 115 116# Log levels 117MQTT_LOG_INFO = 0x01 118MQTT_LOG_NOTICE = 0x02 119MQTT_LOG_WARNING = 0x04 120MQTT_LOG_ERR = 0x08 121MQTT_LOG_DEBUG = 0x10 122LOGGING_LEVEL = { 123 MQTT_LOG_DEBUG: logging.DEBUG, 124 MQTT_LOG_INFO: logging.INFO, 125 MQTT_LOG_NOTICE: logging.INFO, # This has no direct equivalent level 126 MQTT_LOG_WARNING: logging.WARNING, 127 MQTT_LOG_ERR: logging.ERROR, 128} 129 130# CONNACK codes 131CONNACK_ACCEPTED = 0 132CONNACK_REFUSED_PROTOCOL_VERSION = 1 133CONNACK_REFUSED_IDENTIFIER_REJECTED = 2 134CONNACK_REFUSED_SERVER_UNAVAILABLE = 3 135CONNACK_REFUSED_BAD_USERNAME_PASSWORD = 4 136CONNACK_REFUSED_NOT_AUTHORIZED = 5 137 138# Connection state 139mqtt_cs_new = 0 140mqtt_cs_connected = 1 141mqtt_cs_disconnecting = 2 142mqtt_cs_connect_async = 3 143 144# Message state 145mqtt_ms_invalid = 0 146mqtt_ms_publish = 1 147mqtt_ms_wait_for_puback = 2 148mqtt_ms_wait_for_pubrec = 3 149mqtt_ms_resend_pubrel = 4 150mqtt_ms_wait_for_pubrel = 5 151mqtt_ms_resend_pubcomp = 6 152mqtt_ms_wait_for_pubcomp = 7 153mqtt_ms_send_pubrec = 8 154mqtt_ms_queued = 9 155 156# Error values 157MQTT_ERR_AGAIN = -1 158MQTT_ERR_SUCCESS = 0 159MQTT_ERR_NOMEM = 1 160MQTT_ERR_PROTOCOL = 2 161MQTT_ERR_INVAL = 3 162MQTT_ERR_NO_CONN = 4 163MQTT_ERR_CONN_REFUSED = 5 164MQTT_ERR_NOT_FOUND = 6 165MQTT_ERR_CONN_LOST = 7 166MQTT_ERR_TLS = 8 167MQTT_ERR_PAYLOAD_SIZE = 9 168MQTT_ERR_NOT_SUPPORTED = 10 169MQTT_ERR_AUTH = 11 170MQTT_ERR_ACL_DENIED = 12 171MQTT_ERR_UNKNOWN = 13 172MQTT_ERR_ERRNO = 14 173MQTT_ERR_QUEUE_SIZE = 15 174MQTT_ERR_KEEPALIVE = 16 175 176MQTT_CLIENT = 0 177MQTT_BRIDGE = 1 178 179# For MQTT V5, use the clean start flag only on the first successful connect 180MQTT_CLEAN_START_FIRST_ONLY = 3 181 182sockpair_data = b"0" 183 184 185class WebsocketConnectionError(ValueError): 186 pass 187 188 189def error_string(mqtt_errno): 190 """Return the error string associated with an mqtt error number.""" 191 if mqtt_errno == MQTT_ERR_SUCCESS: 192 return "No error." 193 elif mqtt_errno == MQTT_ERR_NOMEM: 194 return "Out of memory." 195 elif mqtt_errno == MQTT_ERR_PROTOCOL: 196 return "A network protocol error occurred when communicating with the broker." 197 elif mqtt_errno == MQTT_ERR_INVAL: 198 return "Invalid function arguments provided." 199 elif mqtt_errno == MQTT_ERR_NO_CONN: 200 return "The client is not currently connected." 201 elif mqtt_errno == MQTT_ERR_CONN_REFUSED: 202 return "The connection was refused." 203 elif mqtt_errno == MQTT_ERR_NOT_FOUND: 204 return "Message not found (internal error)." 205 elif mqtt_errno == MQTT_ERR_CONN_LOST: 206 return "The connection was lost." 207 elif mqtt_errno == MQTT_ERR_TLS: 208 return "A TLS error occurred." 209 elif mqtt_errno == MQTT_ERR_PAYLOAD_SIZE: 210 return "Payload too large." 211 elif mqtt_errno == MQTT_ERR_NOT_SUPPORTED: 212 return "This feature is not supported." 213 elif mqtt_errno == MQTT_ERR_AUTH: 214 return "Authorisation failed." 215 elif mqtt_errno == MQTT_ERR_ACL_DENIED: 216 return "Access denied by ACL." 217 elif mqtt_errno == MQTT_ERR_UNKNOWN: 218 return "Unknown error." 219 elif mqtt_errno == MQTT_ERR_ERRNO: 220 return "Error defined by errno." 221 elif mqtt_errno == MQTT_ERR_QUEUE_SIZE: 222 return "Message queue full." 223 elif mqtt_errno == MQTT_ERR_KEEPALIVE: 224 return "Client or broker did not communicate in the keepalive interval." 225 else: 226 return "Unknown error." 227 228 229def connack_string(connack_code): 230 """Return the string associated with a CONNACK result.""" 231 if connack_code == CONNACK_ACCEPTED: 232 return "Connection Accepted." 233 elif connack_code == CONNACK_REFUSED_PROTOCOL_VERSION: 234 return "Connection Refused: unacceptable protocol version." 235 elif connack_code == CONNACK_REFUSED_IDENTIFIER_REJECTED: 236 return "Connection Refused: identifier rejected." 237 elif connack_code == CONNACK_REFUSED_SERVER_UNAVAILABLE: 238 return "Connection Refused: broker unavailable." 239 elif connack_code == CONNACK_REFUSED_BAD_USERNAME_PASSWORD: 240 return "Connection Refused: bad user name or password." 241 elif connack_code == CONNACK_REFUSED_NOT_AUTHORIZED: 242 return "Connection Refused: not authorised." 243 else: 244 return "Connection Refused: unknown reason." 245 246 247def base62(num, base=string.digits + string.ascii_letters, padding=1): 248 """Convert a number to base-62 representation.""" 249 assert num >= 0 250 digits = [] 251 while num: 252 num, rest = divmod(num, 62) 253 digits.append(base[rest]) 254 digits.extend(base[0] for _ in range(len(digits), padding)) 255 return ''.join(reversed(digits)) 256 257 258def topic_matches_sub(sub, topic): 259 """Check whether a topic matches a subscription. 260 261 For example: 262 263 foo/bar would match the subscription foo/# or +/bar 264 non/matching would not match the subscription non/+/+ 265 """ 266 matcher = MQTTMatcher() 267 matcher[sub] = True 268 try: 269 next(matcher.iter_match(topic)) 270 return True 271 except StopIteration: 272 return False 273 274 275def _socketpair_compat(): 276 """TCP/IP socketpair including Windows support""" 277 listensock = socket.socket( 278 socket.AF_INET, socket.SOCK_STREAM, socket.IPPROTO_IP) 279 listensock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) 280 listensock.bind(("127.0.0.1", 0)) 281 listensock.listen(1) 282 283 iface, port = listensock.getsockname() 284 sock1 = socket.socket( 285 socket.AF_INET, socket.SOCK_STREAM, socket.IPPROTO_IP) 286 sock1.setblocking(0) 287 try: 288 sock1.connect(("127.0.0.1", port)) 289 except BlockingIOError: 290 pass 291 sock2, address = listensock.accept() 292 sock2.setblocking(0) 293 listensock.close() 294 return (sock1, sock2) 295 296 297class MQTTMessageInfo(object): 298 """This is a class returned from Client.publish() and can be used to find 299 out the mid of the message that was published, and to determine whether the 300 message has been published, and/or wait until it is published. 301 """ 302 303 __slots__ = 'mid', '_published', '_condition', 'rc', '_iterpos' 304 305 def __init__(self, mid): 306 self.mid = mid 307 self._published = False 308 self._condition = threading.Condition() 309 self.rc = 0 310 self._iterpos = 0 311 312 def __str__(self): 313 return str((self.rc, self.mid)) 314 315 def __iter__(self): 316 self._iterpos = 0 317 return self 318 319 def __next__(self): 320 return self.next() 321 322 def next(self): 323 if self._iterpos == 0: 324 self._iterpos = 1 325 return self.rc 326 elif self._iterpos == 1: 327 self._iterpos = 2 328 return self.mid 329 else: 330 raise StopIteration 331 332 def __getitem__(self, index): 333 if index == 0: 334 return self.rc 335 elif index == 1: 336 return self.mid 337 else: 338 raise IndexError("index out of range") 339 340 def _set_as_published(self): 341 with self._condition: 342 self._published = True 343 self._condition.notify() 344 345 def wait_for_publish(self, timeout=None): 346 """Block until the message associated with this object is published, or 347 until the timeout occurs. If timeout is None, this will never time out. 348 Set timeout to a positive number of seconds, e.g. 1.2, to enable the 349 timeout. 350 351 Raises ValueError if the message was not queued due to the outgoing 352 queue being full. 353 354 Raises RuntimeError if the message was not published for another 355 reason. 356 """ 357 if self.rc == MQTT_ERR_QUEUE_SIZE: 358 raise ValueError('Message is not queued due to ERR_QUEUE_SIZE') 359 elif self.rc == MQTT_ERR_AGAIN: 360 pass 361 elif self.rc > 0: 362 raise RuntimeError('Message publish failed: %s' % (error_string(self.rc))) 363 364 timeout_time = None if timeout is None else time.time() + timeout 365 timeout_tenth = None if timeout is None else timeout / 10. 366 def timed_out(): 367 return False if timeout is None else time.time() > timeout_time 368 369 with self._condition: 370 while not self._published and not timed_out(): 371 self._condition.wait(timeout_tenth) 372 373 def is_published(self): 374 """Returns True if the message associated with this object has been 375 published, else returns False.""" 376 if self.rc == MQTT_ERR_QUEUE_SIZE: 377 raise ValueError('Message is not queued due to ERR_QUEUE_SIZE') 378 elif self.rc == MQTT_ERR_AGAIN: 379 pass 380 elif self.rc > 0: 381 raise RuntimeError('Message publish failed: %s' % (error_string(self.rc))) 382 383 with self._condition: 384 return self._published 385 386 387class MQTTMessage(object): 388 """ This is a class that describes an incoming or outgoing message. It is 389 passed to the on_message callback as the message parameter. 390 391 Members: 392 393 topic : String. topic that the message was published on. 394 payload : Bytes/Byte array. the message payload. 395 qos : Integer. The message Quality of Service 0, 1 or 2. 396 retain : Boolean. If true, the message is a retained message and not fresh. 397 mid : Integer. The message id. 398 properties: Properties class. In MQTT v5.0, the properties associated with the message. 399 """ 400 401 __slots__ = 'timestamp', 'state', 'dup', 'mid', '_topic', 'payload', 'qos', 'retain', 'info', 'properties' 402 403 def __init__(self, mid=0, topic=b""): 404 self.timestamp = 0 405 self.state = mqtt_ms_invalid 406 self.dup = False 407 self.mid = mid 408 self._topic = topic 409 self.payload = b"" 410 self.qos = 0 411 self.retain = False 412 self.info = MQTTMessageInfo(mid) 413 414 def __eq__(self, other): 415 """Override the default Equals behavior""" 416 if isinstance(other, self.__class__): 417 return self.mid == other.mid 418 return False 419 420 def __ne__(self, other): 421 """Define a non-equality test""" 422 return not self.__eq__(other) 423 424 @property 425 def topic(self): 426 return self._topic.decode('utf-8') 427 428 @topic.setter 429 def topic(self, value): 430 self._topic = value 431 432 433class Client(object): 434 """MQTT version 3.1/3.1.1/5.0 client class. 435 436 This is the main class for use communicating with an MQTT broker. 437 438 General usage flow: 439 440 * Use connect()/connect_async() to connect to a broker 441 * Call loop() frequently to maintain network traffic flow with the broker 442 * Or use loop_start() to set a thread running to call loop() for you. 443 * Or use loop_forever() to handle calling loop() for you in a blocking 444 * function. 445 * Use subscribe() to subscribe to a topic and receive messages 446 * Use publish() to send messages 447 * Use disconnect() to disconnect from the broker 448 449 Data returned from the broker is made available with the use of callback 450 functions as described below. 451 452 Callbacks 453 ========= 454 455 A number of callback functions are available to receive data back from the 456 broker. To use a callback, define a function and then assign it to the 457 client: 458 459 def on_connect(client, userdata, flags, rc): 460 print("Connection returned " + str(rc)) 461 462 client.on_connect = on_connect 463 464 Callbacks can also be attached using decorators: 465 466 client = paho.mqtt.Client() 467 468 @client.connect_callback() 469 def on_connect(client, userdata, flags, rc): 470 print("Connection returned " + str(rc)) 471 472 473 **IMPORTANT** the required function signature for a callback can differ 474 depending on whether you are using MQTT v5 or MQTT v3.1.1/v3.1. See the 475 documentation for each callback. 476 477 All of the callbacks as described below have a "client" and an "userdata" 478 argument. "client" is the Client instance that is calling the callback. 479 "userdata" is user data of any type and can be set when creating a new client 480 instance or with user_data_set(userdata). 481 482 If you wish to suppress exceptions within a callback, you should set 483 `client.suppress_exceptions = True` 484 485 The callbacks are listed below, documentation for each of them can be found 486 at the same function name: 487 488 on_connect, on_connect_fail, on_disconnect, on_message, on_publish, 489 on_subscribe, on_unsubscribe, on_log, on_socket_open, on_socket_close, 490 on_socket_register_write, on_socket_unregister_write 491 """ 492 493 def __init__(self, client_id="", clean_session=None, userdata=None, 494 protocol=MQTTv311, transport="tcp", reconnect_on_failure=True): 495 """client_id is the unique client id string used when connecting to the 496 broker. If client_id is zero length or None, then the behaviour is 497 defined by which protocol version is in use. If using MQTT v3.1.1, then 498 a zero length client id will be sent to the broker and the broker will 499 generate a random for the client. If using MQTT v3.1 then an id will be 500 randomly generated. In both cases, clean_session must be True. If this 501 is not the case a ValueError will be raised. 502 503 clean_session is a boolean that determines the client type. If True, 504 the broker will remove all information about this client when it 505 disconnects. If False, the client is a persistent client and 506 subscription information and queued messages will be retained when the 507 client disconnects. 508 Note that a client will never discard its own outgoing messages on 509 disconnect. Calling connect() or reconnect() will cause the messages to 510 be resent. Use reinitialise() to reset a client to its original state. 511 The clean_session argument only applies to MQTT versions v3.1.1 and v3.1. 512 It is not accepted if the MQTT version is v5.0 - use the clean_start 513 argument on connect() instead. 514 515 userdata is user defined data of any type that is passed as the "userdata" 516 parameter to callbacks. It may be updated at a later point with the 517 user_data_set() function. 518 519 The protocol argument allows explicit setting of the MQTT version to 520 use for this client. Can be paho.mqtt.client.MQTTv311 (v3.1.1), 521 paho.mqtt.client.MQTTv31 (v3.1) or paho.mqtt.client.MQTTv5 (v5.0), 522 with the default being v3.1.1. 523 524 Set transport to "websockets" to use WebSockets as the transport 525 mechanism. Set to "tcp" to use raw TCP, which is the default. 526 """ 527 528 if transport.lower() not in ('websockets', 'tcp'): 529 raise ValueError( 530 'transport must be "websockets" or "tcp", not %s' % transport) 531 self._transport = transport.lower() 532 self._protocol = protocol 533 self._userdata = userdata 534 self._sock = None 535 self._sockpairR, self._sockpairW = (None, None,) 536 self._keepalive = 60 537 self._connect_timeout = 5.0 538 self._client_mode = MQTT_CLIENT 539 540 if protocol == MQTTv5: 541 if clean_session is not None: 542 raise ValueError('Clean session is not used for MQTT 5.0') 543 else: 544 if clean_session is None: 545 clean_session = True 546 if not clean_session and (client_id == "" or client_id is None): 547 raise ValueError( 548 'A client id must be provided if clean session is False.') 549 self._clean_session = clean_session 550 551 # [MQTT-3.1.3-4] Client Id must be UTF-8 encoded string. 552 if client_id == "" or client_id is None: 553 if protocol == MQTTv31: 554 self._client_id = base62(uuid.uuid4().int, padding=22) 555 else: 556 self._client_id = b"" 557 else: 558 self._client_id = client_id 559 if isinstance(self._client_id, unicode): 560 self._client_id = self._client_id.encode('utf-8') 561 562 self._username = None 563 self._password = None 564 self._in_packet = { 565 "command": 0, 566 "have_remaining": 0, 567 "remaining_count": [], 568 "remaining_mult": 1, 569 "remaining_length": 0, 570 "packet": bytearray(b""), 571 "to_process": 0, 572 "pos": 0} 573 self._out_packet = collections.deque() 574 self._last_msg_in = time_func() 575 self._last_msg_out = time_func() 576 self._reconnect_min_delay = 1 577 self._reconnect_max_delay = 120 578 self._reconnect_delay = None 579 self._reconnect_on_failure = reconnect_on_failure 580 self._ping_t = 0 581 self._last_mid = 0 582 self._state = mqtt_cs_new 583 self._out_messages = collections.OrderedDict() 584 self._in_messages = collections.OrderedDict() 585 self._max_inflight_messages = 20 586 self._inflight_messages = 0 587 self._max_queued_messages = 0 588 self._connect_properties = None 589 self._will_properties = None 590 self._will = False 591 self._will_topic = b"" 592 self._will_payload = b"" 593 self._will_qos = 0 594 self._will_retain = False 595 self._on_message_filtered = MQTTMatcher() 596 self._host = "" 597 self._port = 1883 598 self._bind_address = "" 599 self._bind_port = 0 600 self._proxy = {} 601 self._in_callback_mutex = threading.Lock() 602 self._callback_mutex = threading.RLock() 603 self._msgtime_mutex = threading.Lock() 604 self._out_message_mutex = threading.RLock() 605 self._in_message_mutex = threading.Lock() 606 self._reconnect_delay_mutex = threading.Lock() 607 self._mid_generate_mutex = threading.Lock() 608 self._thread = None 609 self._thread_terminate = False 610 self._ssl = False 611 self._ssl_context = None 612 # Only used when SSL context does not have check_hostname attribute 613 self._tls_insecure = False 614 self._logger = None 615 self._registered_write = False 616 # No default callbacks 617 self._on_log = None 618 self._on_connect = None 619 self._on_connect_fail = None 620 self._on_subscribe = None 621 self._on_message = None 622 self._on_publish = None 623 self._on_unsubscribe = None 624 self._on_disconnect = None 625 self._on_socket_open = None 626 self._on_socket_close = None 627 self._on_socket_register_write = None 628 self._on_socket_unregister_write = None 629 self._websocket_path = "/mqtt" 630 self._websocket_extra_headers = None 631 # for clean_start == MQTT_CLEAN_START_FIRST_ONLY 632 self._mqttv5_first_connect = True 633 self.suppress_exceptions = False # For callbacks 634 635 def __del__(self): 636 self._reset_sockets() 637 638 def _sock_recv(self, bufsize): 639 try: 640 return self._sock.recv(bufsize) 641 except ssl.SSLWantReadError: 642 raise BlockingIOError 643 except ssl.SSLWantWriteError: 644 self._call_socket_register_write() 645 raise BlockingIOError 646 647 def _sock_send(self, buf): 648 try: 649 return self._sock.send(buf) 650 except ssl.SSLWantReadError: 651 raise BlockingIOError 652 except ssl.SSLWantWriteError: 653 self._call_socket_register_write() 654 raise BlockingIOError 655 except BlockingIOError: 656 self._call_socket_register_write() 657 raise BlockingIOError 658 659 def _sock_close(self): 660 """Close the connection to the server.""" 661 if not self._sock: 662 return 663 664 try: 665 sock = self._sock 666 self._sock = None 667 self._call_socket_unregister_write(sock) 668 self._call_socket_close(sock) 669 finally: 670 # In case a callback fails, still close the socket to avoid leaking the file descriptor. 671 sock.close() 672 673 def _reset_sockets(self, sockpair_only=False): 674 if sockpair_only == False: 675 self._sock_close() 676 677 if self._sockpairR: 678 self._sockpairR.close() 679 self._sockpairR = None 680 if self._sockpairW: 681 self._sockpairW.close() 682 self._sockpairW = None 683 684 def reinitialise(self, client_id="", clean_session=True, userdata=None): 685 self._reset_sockets() 686 687 self.__init__(client_id, clean_session, userdata) 688 689 def ws_set_options(self, path="/mqtt", headers=None): 690 """ Set the path and headers for a websocket connection 691 692 path is a string starting with / which should be the endpoint of the 693 mqtt connection on the remote server 694 695 headers can be either a dict or a callable object. If it is a dict then 696 the extra items in the dict are added to the websocket headers. If it is 697 a callable, then the default websocket headers are passed into this 698 function and the result is used as the new headers. 699 """ 700 self._websocket_path = path 701 702 if headers is not None: 703 if isinstance(headers, dict) or callable(headers): 704 self._websocket_extra_headers = headers 705 else: 706 raise ValueError( 707 "'headers' option to ws_set_options has to be either a dictionary or callable") 708 709 def tls_set_context(self, context=None): 710 """Configure network encryption and authentication context. Enables SSL/TLS support. 711 712 context : an ssl.SSLContext object. By default this is given by 713 `ssl.create_default_context()`, if available. 714 715 Must be called before connect() or connect_async().""" 716 if self._ssl_context is not None: 717 raise ValueError('SSL/TLS has already been configured.') 718 719 # Assume that have SSL support, or at least that context input behaves like ssl.SSLContext 720 # in current versions of Python 721 722 if context is None: 723 if hasattr(ssl, 'create_default_context'): 724 context = ssl.create_default_context() 725 else: 726 raise ValueError('SSL/TLS context must be specified') 727 728 self._ssl = True 729 self._ssl_context = context 730 731 # Ensure _tls_insecure is consistent with check_hostname attribute 732 if hasattr(context, 'check_hostname'): 733 self._tls_insecure = not context.check_hostname 734 735 def tls_set(self, ca_certs=None, certfile=None, keyfile=None, cert_reqs=None, tls_version=None, ciphers=None, keyfile_password=None): 736 """Configure network encryption and authentication options. Enables SSL/TLS support. 737 738 ca_certs : a string path to the Certificate Authority certificate files 739 that are to be treated as trusted by this client. If this is the only 740 option given then the client will operate in a similar manner to a web 741 browser. That is to say it will require the broker to have a 742 certificate signed by the Certificate Authorities in ca_certs and will 743 communicate using TLS v1,2, but will not attempt any form of 744 authentication. This provides basic network encryption but may not be 745 sufficient depending on how the broker is configured. 746 By default, on Python 2.7.9+ or 3.4+, the default certification 747 authority of the system is used. On older Python version this parameter 748 is mandatory. 749 750 certfile and keyfile are strings pointing to the PEM encoded client 751 certificate and private keys respectively. If these arguments are not 752 None then they will be used as client information for TLS based 753 authentication. Support for this feature is broker dependent. Note 754 that if either of these files in encrypted and needs a password to 755 decrypt it, then this can be passed using the keyfile_password 756 argument - you should take precautions to ensure that your password is 757 not hard coded into your program by loading the password from a file 758 for example. If you do not provide keyfile_password, the password will 759 be requested to be typed in at a terminal window. 760 761 cert_reqs allows the certificate requirements that the client imposes 762 on the broker to be changed. By default this is ssl.CERT_REQUIRED, 763 which means that the broker must provide a certificate. See the ssl 764 pydoc for more information on this parameter. 765 766 tls_version allows the version of the SSL/TLS protocol used to be 767 specified. By default TLS v1.2 is used. Previous versions are allowed 768 but not recommended due to possible security problems. 769 770 ciphers is a string specifying which encryption ciphers are allowable 771 for this connection, or None to use the defaults. See the ssl pydoc for 772 more information. 773 774 Must be called before connect() or connect_async().""" 775 if ssl is None: 776 raise ValueError('This platform has no SSL/TLS.') 777 778 if not hasattr(ssl, 'SSLContext'): 779 # Require Python version that has SSL context support in standard library 780 raise ValueError( 781 'Python 2.7.9 and 3.2 are the minimum supported versions for TLS.') 782 783 if ca_certs is None and not hasattr(ssl.SSLContext, 'load_default_certs'): 784 raise ValueError('ca_certs must not be None.') 785 786 # Create SSLContext object 787 if tls_version is None: 788 tls_version = ssl.PROTOCOL_TLSv1_2 789 # If the python version supports it, use highest TLS version automatically 790 if hasattr(ssl, "PROTOCOL_TLS"): 791 tls_version = ssl.PROTOCOL_TLS 792 context = ssl.SSLContext(tls_version) 793 794 # Configure context 795 if certfile is not None: 796 context.load_cert_chain(certfile, keyfile, keyfile_password) 797 798 if cert_reqs == ssl.CERT_NONE and hasattr(context, 'check_hostname'): 799 context.check_hostname = False 800 801 context.verify_mode = ssl.CERT_REQUIRED if cert_reqs is None else cert_reqs 802 803 if ca_certs is not None: 804 context.load_verify_locations(ca_certs) 805 else: 806 context.load_default_certs() 807 808 if ciphers is not None: 809 context.set_ciphers(ciphers) 810 811 self.tls_set_context(context) 812 813 if cert_reqs != ssl.CERT_NONE: 814 # Default to secure, sets context.check_hostname attribute 815 # if available 816 self.tls_insecure_set(False) 817 else: 818 # But with ssl.CERT_NONE, we can not check_hostname 819 self.tls_insecure_set(True) 820 821 def tls_insecure_set(self, value): 822 """Configure verification of the server hostname in the server certificate. 823 824 If value is set to true, it is impossible to guarantee that the host 825 you are connecting to is not impersonating your server. This can be 826 useful in initial server testing, but makes it possible for a malicious 827 third party to impersonate your server through DNS spoofing, for 828 example. 829 830 Do not use this function in a real system. Setting value to true means 831 there is no point using encryption. 832 833 Must be called before connect() and after either tls_set() or 834 tls_set_context().""" 835 836 if self._ssl_context is None: 837 raise ValueError( 838 'Must configure SSL context before using tls_insecure_set.') 839 840 self._tls_insecure = value 841 842 # Ensure check_hostname is consistent with _tls_insecure attribute 843 if hasattr(self._ssl_context, 'check_hostname'): 844 # Rely on SSLContext to check host name 845 # If verify_mode is CERT_NONE then the host name will never be checked 846 self._ssl_context.check_hostname = not value 847 848 def proxy_set(self, **proxy_args): 849 """Configure proxying of MQTT connection. Enables support for SOCKS or 850 HTTP proxies. 851 852 Proxying is done through the PySocks library. Brief descriptions of the 853 proxy_args parameters are below; see the PySocks docs for more info. 854 855 (Required) 856 proxy_type: One of {socks.HTTP, socks.SOCKS4, or socks.SOCKS5} 857 proxy_addr: IP address or DNS name of proxy server 858 859 (Optional) 860 proxy_rdns: boolean indicating whether proxy lookup should be performed 861 remotely (True, default) or locally (False) 862 proxy_username: username for SOCKS5 proxy, or userid for SOCKS4 proxy 863 proxy_password: password for SOCKS5 proxy 864 865 Must be called before connect() or connect_async().""" 866 if socks is None: 867 raise ValueError("PySocks must be installed for proxy support.") 868 elif not self._proxy_is_valid(proxy_args): 869 raise ValueError("proxy_type and/or proxy_addr are invalid.") 870 else: 871 self._proxy = proxy_args 872 873 def enable_logger(self, logger=None): 874 """ Enables a logger to send log messages to """ 875 if logger is None: 876 if self._logger is not None: 877 # Do not replace existing logger 878 return 879 logger = logging.getLogger(__name__) 880 self._logger = logger 881 882 def disable_logger(self): 883 self._logger = None 884 885 def connect(self, host, port=1883, keepalive=60, bind_address="", bind_port=0, 886 clean_start=MQTT_CLEAN_START_FIRST_ONLY, properties=None): 887 """Connect to a remote broker. 888 889 host is the hostname or IP address of the remote broker. 890 port is the network port of the server host to connect to. Defaults to 891 1883. Note that the default port for MQTT over SSL/TLS is 8883 so if you 892 are using tls_set() the port may need providing. 893 keepalive: Maximum period in seconds between communications with the 894 broker. If no other messages are being exchanged, this controls the 895 rate at which the client will send ping messages to the broker. 896 clean_start: (MQTT v5.0 only) True, False or MQTT_CLEAN_START_FIRST_ONLY. 897 Sets the MQTT v5.0 clean_start flag always, never or on the first successful connect only, 898 respectively. MQTT session data (such as outstanding messages and subscriptions) 899 is cleared on successful connect when the clean_start flag is set. 900 properties: (MQTT v5.0 only) the MQTT v5.0 properties to be sent in the 901 MQTT connect packet. 902 """ 903 904 if self._protocol == MQTTv5: 905 self._mqttv5_first_connect = True 906 else: 907 if clean_start != MQTT_CLEAN_START_FIRST_ONLY: 908 raise ValueError("Clean start only applies to MQTT V5") 909 if properties != None: 910 raise ValueError("Properties only apply to MQTT V5") 911 912 self.connect_async(host, port, keepalive, 913 bind_address, bind_port, clean_start, properties) 914 return self.reconnect() 915 916 def connect_srv(self, domain=None, keepalive=60, bind_address="", 917 clean_start=MQTT_CLEAN_START_FIRST_ONLY, properties=None): 918 """Connect to a remote broker. 919 920 domain is the DNS domain to search for SRV records; if None, 921 try to determine local domain name. 922 keepalive, bind_address, clean_start and properties are as for connect() 923 """ 924 925 if HAVE_DNS is False: 926 raise ValueError( 927 'No DNS resolver library found, try "pip install dnspython" or "pip3 install dnspython3".') 928 929 if domain is None: 930 domain = socket.getfqdn() 931 domain = domain[domain.find('.') + 1:] 932 933 try: 934 rr = '_mqtt._tcp.%s' % domain 935 if self._ssl: 936 # IANA specifies secure-mqtt (not mqtts) for port 8883 937 rr = '_secure-mqtt._tcp.%s' % domain 938 answers = [] 939 for answer in dns.resolver.query(rr, dns.rdatatype.SRV): 940 addr = answer.target.to_text()[:-1] 941 answers.append( 942 (addr, answer.port, answer.priority, answer.weight)) 943 except (dns.resolver.NXDOMAIN, dns.resolver.NoAnswer, dns.resolver.NoNameservers): 944 raise ValueError("No answer/NXDOMAIN for SRV in %s" % (domain)) 945 946 # FIXME: doesn't account for weight 947 for answer in answers: 948 host, port, prio, weight = answer 949 950 try: 951 return self.connect(host, port, keepalive, bind_address, clean_start, properties) 952 except Exception: 953 pass 954 955 raise ValueError("No SRV hosts responded") 956 957 def connect_async(self, host, port=1883, keepalive=60, bind_address="", bind_port=0, 958 clean_start=MQTT_CLEAN_START_FIRST_ONLY, properties=None): 959 """Connect to a remote broker asynchronously. This is a non-blocking 960 connect call that can be used with loop_start() to provide very quick 961 start. 962 963 host is the hostname or IP address of the remote broker. 964 port is the network port of the server host to connect to. Defaults to 965 1883. Note that the default port for MQTT over SSL/TLS is 8883 so if you 966 are using tls_set() the port may need providing. 967 keepalive: Maximum period in seconds between communications with the 968 broker. If no other messages are being exchanged, this controls the 969 rate at which the client will send ping messages to the broker. 970 clean_start: (MQTT v5.0 only) True, False or MQTT_CLEAN_START_FIRST_ONLY. 971 Sets the MQTT v5.0 clean_start flag always, never or on the first successful connect only, 972 respectively. MQTT session data (such as outstanding messages and subscriptions) 973 is cleared on successful connect when the clean_start flag is set. 974 properties: (MQTT v5.0 only) the MQTT v5.0 properties to be sent in the 975 MQTT connect packet. Use the Properties class. 976 """ 977 if host is None or len(host) == 0: 978 raise ValueError('Invalid host.') 979 if port <= 0: 980 raise ValueError('Invalid port number.') 981 if keepalive < 0: 982 raise ValueError('Keepalive must be >=0.') 983 if bind_address != "" and bind_address is not None: 984 if sys.version_info < (2, 7) or (3, 0) < sys.version_info < (3, 2): 985 raise ValueError('bind_address requires Python 2.7 or 3.2.') 986 if bind_port < 0: 987 raise ValueError('Invalid bind port number.') 988 989 self._host = host 990 self._port = port 991 self._keepalive = keepalive 992 self._bind_address = bind_address 993 self._bind_port = bind_port 994 self._clean_start = clean_start 995 self._connect_properties = properties 996 self._state = mqtt_cs_connect_async 997 998 999 def reconnect_delay_set(self, min_delay=1, max_delay=120): 1000 """ Configure the exponential reconnect delay 1001 1002 When connection is lost, wait initially min_delay seconds and 1003 double this time every attempt. The wait is capped at max_delay. 1004 Once the client is fully connected (e.g. not only TCP socket, but 1005 received a success CONNACK), the wait timer is reset to min_delay. 1006 """ 1007 with self._reconnect_delay_mutex: 1008 self._reconnect_min_delay = min_delay 1009 self._reconnect_max_delay = max_delay 1010 self._reconnect_delay = None 1011 1012 def reconnect(self): 1013 """Reconnect the client after a disconnect. Can only be called after 1014 connect()/connect_async().""" 1015 if len(self._host) == 0: 1016 raise ValueError('Invalid host.') 1017 if self._port <= 0: 1018 raise ValueError('Invalid port number.') 1019 1020 self._in_packet = { 1021 "command": 0, 1022 "have_remaining": 0, 1023 "remaining_count": [], 1024 "remaining_mult": 1, 1025 "remaining_length": 0, 1026 "packet": bytearray(b""), 1027 "to_process": 0, 1028 "pos": 0} 1029 1030 self._out_packet = collections.deque() 1031 1032 with self._msgtime_mutex: 1033 self._last_msg_in = time_func() 1034 self._last_msg_out = time_func() 1035 1036 self._ping_t = 0 1037 self._state = mqtt_cs_new 1038 1039 self._sock_close() 1040 1041 # Put messages in progress in a valid state. 1042 self._messages_reconnect_reset() 1043 1044 sock = self._create_socket_connection() 1045 1046 if self._ssl: 1047 # SSL is only supported when SSLContext is available (implies Python >= 2.7.9 or >= 3.2) 1048 1049 verify_host = not self._tls_insecure 1050 try: 1051 # Try with server_hostname, even it's not supported in certain scenarios 1052 sock = self._ssl_context.wrap_socket( 1053 sock, 1054 server_hostname=self._host, 1055 do_handshake_on_connect=False, 1056 ) 1057 except ssl.CertificateError: 1058 # CertificateError is derived from ValueError 1059 raise 1060 except ValueError: 1061 # Python version requires SNI in order to handle server_hostname, but SNI is not available 1062 sock = self._ssl_context.wrap_socket( 1063 sock, 1064 do_handshake_on_connect=False, 1065 ) 1066 else: 1067 # If SSL context has already checked hostname, then don't need to do it again 1068 if (hasattr(self._ssl_context, 'check_hostname') and 1069 self._ssl_context.check_hostname): 1070 verify_host = False 1071 1072 sock.settimeout(self._keepalive) 1073 sock.do_handshake() 1074 1075 if verify_host: 1076 ssl.match_hostname(sock.getpeercert(), self._host) 1077 1078 if self._transport == "websockets": 1079 sock.settimeout(self._keepalive) 1080 sock = WebsocketWrapper(sock, self._host, self._port, self._ssl, 1081 self._websocket_path, self._websocket_extra_headers) 1082 1083 self._sock = sock 1084 self._sock.setblocking(0) 1085 self._registered_write = False 1086 self._call_socket_open() 1087 1088 return self._send_connect(self._keepalive) 1089 1090 def loop(self, timeout=1.0, max_packets=1): 1091 """Process network events. 1092 1093 It is strongly recommended that you use loop_start(), or 1094 loop_forever(), or if you are using an external event loop using 1095 loop_read(), loop_write(), and loop_misc(). Using loop() on it's own is 1096 no longer recommended. 1097 1098 This function must be called regularly to ensure communication with the 1099 broker is carried out. It calls select() on the network socket to wait 1100 for network events. If incoming data is present it will then be 1101 processed. Outgoing commands, from e.g. publish(), are normally sent 1102 immediately that their function is called, but this is not always 1103 possible. loop() will also attempt to send any remaining outgoing 1104 messages, which also includes commands that are part of the flow for 1105 messages with QoS>0. 1106 1107 timeout: The time in seconds to wait for incoming/outgoing network 1108 traffic before timing out and returning. 1109 max_packets: Not currently used. 1110 1111 Returns MQTT_ERR_SUCCESS on success. 1112 Returns >0 on error. 1113 1114 A ValueError will be raised if timeout < 0""" 1115 1116 if self._sockpairR is None or self._sockpairW is None: 1117 self._reset_sockets(sockpair_only=True) 1118 self._sockpairR, self._sockpairW = _socketpair_compat() 1119 1120 return self._loop(timeout) 1121 1122 def _loop(self, timeout=1.0): 1123 if timeout < 0.0: 1124 raise ValueError('Invalid timeout.') 1125 1126 try: 1127 packet = self._out_packet.popleft() 1128 self._out_packet.appendleft(packet) 1129 wlist = [self._sock] 1130 except IndexError: 1131 wlist = [] 1132 1133 # used to check if there are any bytes left in the (SSL) socket 1134 pending_bytes = 0 1135 if hasattr(self._sock, 'pending'): 1136 pending_bytes = self._sock.pending() 1137 1138 # if bytes are pending do not wait in select 1139 if pending_bytes > 0: 1140 timeout = 0.0 1141 1142 # sockpairR is used to break out of select() before the timeout, on a 1143 # call to publish() etc. 1144 if self._sockpairR is None: 1145 rlist = [self._sock] 1146 else: 1147 rlist = [self._sock, self._sockpairR] 1148 1149 try: 1150 socklist = select.select(rlist, wlist, [], timeout) 1151 except TypeError: 1152 # Socket isn't correct type, in likelihood connection is lost 1153 return MQTT_ERR_CONN_LOST 1154 except ValueError: 1155 # Can occur if we just reconnected but rlist/wlist contain a -1 for 1156 # some reason. 1157 return MQTT_ERR_CONN_LOST 1158 except Exception: 1159 # Note that KeyboardInterrupt, etc. can still terminate since they 1160 # are not derived from Exception 1161 return MQTT_ERR_UNKNOWN 1162 1163 if self._sock in socklist[0] or pending_bytes > 0: 1164 rc = self.loop_read() 1165 if rc or self._sock is None: 1166 return rc 1167 1168 if self._sockpairR and self._sockpairR in socklist[0]: 1169 # Stimulate output write even though we didn't ask for it, because 1170 # at that point the publish or other command wasn't present. 1171 socklist[1].insert(0, self._sock) 1172 # Clear sockpairR - only ever a single byte written. 1173 try: 1174 # Read many bytes at once - this allows up to 10000 calls to 1175 # publish() inbetween calls to loop(). 1176 self._sockpairR.recv(10000) 1177 except BlockingIOError: 1178 pass 1179 1180 if self._sock in socklist[1]: 1181 rc = self.loop_write() 1182 if rc or self._sock is None: 1183 return rc 1184 1185 return self.loop_misc() 1186 1187 def publish(self, topic, payload=None, qos=0, retain=False, properties=None): 1188 """Publish a message on a topic. 1189 1190 This causes a message to be sent to the broker and subsequently from 1191 the broker to any clients subscribing to matching topics. 1192 1193 topic: The topic that the message should be published on. 1194 payload: The actual message to send. If not given, or set to None a 1195 zero length message will be used. Passing an int or float will result 1196 in the payload being converted to a string representing that number. If 1197 you wish to send a true int/float, use struct.pack() to create the 1198 payload you require. 1199 qos: The quality of service level to use. 1200 retain: If set to true, the message will be set as the "last known 1201 good"/retained message for the topic. 1202 properties: (MQTT v5.0 only) the MQTT v5.0 properties to be included. 1203 Use the Properties class. 1204 1205 Returns a MQTTMessageInfo class, which can be used to determine whether 1206 the message has been delivered (using info.is_published()) or to block 1207 waiting for the message to be delivered (info.wait_for_publish()). The 1208 message ID and return code of the publish() call can be found at 1209 info.mid and info.rc. 1210 1211 For backwards compatibility, the MQTTMessageInfo class is iterable so 1212 the old construct of (rc, mid) = client.publish(...) is still valid. 1213 1214 rc is MQTT_ERR_SUCCESS to indicate success or MQTT_ERR_NO_CONN if the 1215 client is not currently connected. mid is the message ID for the 1216 publish request. The mid value can be used to track the publish request 1217 by checking against the mid argument in the on_publish() callback if it 1218 is defined. 1219 1220 A ValueError will be raised if topic is None, has zero length or is 1221 invalid (contains a wildcard), except if the MQTT version used is v5.0. 1222 For v5.0, a zero length topic can be used when a Topic Alias has been set. 1223 1224 A ValueError will be raised if qos is not one of 0, 1 or 2, or if 1225 the length of the payload is greater than 268435455 bytes.""" 1226 if self._protocol != MQTTv5: 1227 if topic is None or len(topic) == 0: 1228 raise ValueError('Invalid topic.') 1229 1230 topic = topic.encode('utf-8') 1231 1232 if self._topic_wildcard_len_check(topic) != MQTT_ERR_SUCCESS: 1233 raise ValueError('Publish topic cannot contain wildcards.') 1234 1235 if qos < 0 or qos > 2: 1236 raise ValueError('Invalid QoS level.') 1237 1238 if isinstance(payload, unicode): 1239 local_payload = payload.encode('utf-8') 1240 elif isinstance(payload, (bytes, bytearray)): 1241 local_payload = payload 1242 elif isinstance(payload, (int, float)): 1243 local_payload = str(payload).encode('ascii') 1244 elif payload is None: 1245 local_payload = b'' 1246 else: 1247 raise TypeError( 1248 'payload must be a string, bytearray, int, float or None.') 1249 1250 if len(local_payload) > 268435455: 1251 raise ValueError('Payload too large.') 1252 1253 local_mid = self._mid_generate() 1254 1255 if qos == 0: 1256 info = MQTTMessageInfo(local_mid) 1257 rc = self._send_publish( 1258 local_mid, topic, local_payload, qos, retain, False, info, properties) 1259 info.rc = rc 1260 return info 1261 else: 1262 message = MQTTMessage(local_mid, topic) 1263 message.timestamp = time_func() 1264 message.payload = local_payload 1265 message.qos = qos 1266 message.retain = retain 1267 message.dup = False 1268 message.properties = properties 1269 1270 with self._out_message_mutex: 1271 if self._max_queued_messages > 0 and len(self._out_messages) >= self._max_queued_messages: 1272 message.info.rc = MQTT_ERR_QUEUE_SIZE 1273 return message.info 1274 1275 if local_mid in self._out_messages: 1276 message.info.rc = MQTT_ERR_QUEUE_SIZE 1277 return message.info 1278 1279 self._out_messages[message.mid] = message 1280 if self._max_inflight_messages == 0 or self._inflight_messages < self._max_inflight_messages: 1281 self._inflight_messages += 1 1282 if qos == 1: 1283 message.state = mqtt_ms_wait_for_puback 1284 elif qos == 2: 1285 message.state = mqtt_ms_wait_for_pubrec 1286 1287 rc = self._send_publish(message.mid, topic, message.payload, message.qos, message.retain, 1288 message.dup, message.info, message.properties) 1289 1290 # remove from inflight messages so it will be send after a connection is made 1291 if rc is MQTT_ERR_NO_CONN: 1292 self._inflight_messages -= 1 1293 message.state = mqtt_ms_publish 1294 1295 message.info.rc = rc 1296 return message.info 1297 else: 1298 message.state = mqtt_ms_queued 1299 message.info.rc = MQTT_ERR_SUCCESS 1300 return message.info 1301 1302 def username_pw_set(self, username, password=None): 1303 """Set a username and optionally a password for broker authentication. 1304 1305 Must be called before connect() to have any effect. 1306 Requires a broker that supports MQTT v3.1. 1307 1308 username: The username to authenticate with. Need have no relationship to the client id. Must be unicode 1309 [MQTT-3.1.3-11]. 1310 Set to None to reset client back to not using username/password for broker authentication. 1311 password: The password to authenticate with. Optional, set to None if not required. If it is unicode, then it 1312 will be encoded as UTF-8. 1313 """ 1314 1315 # [MQTT-3.1.3-11] User name must be UTF-8 encoded string 1316 self._username = None if username is None else username.encode('utf-8') 1317 self._password = password 1318 if isinstance(self._password, unicode): 1319 self._password = self._password.encode('utf-8') 1320 1321 def enable_bridge_mode(self): 1322 """Sets the client in a bridge mode instead of client mode. 1323 1324 Must be called before connect() to have any effect. 1325 Requires brokers that support bridge mode. 1326 1327 Under bridge mode, the broker will identify the client as a bridge and 1328 not send it's own messages back to it. Hence a subsciption of # is 1329 possible without message loops. This feature also correctly propagates 1330 the retain flag on the messages. 1331 1332 Currently Mosquitto and RSMB support this feature. This feature can 1333 be used to create a bridge between multiple broker. 1334 """ 1335 self._client_mode = MQTT_BRIDGE 1336 1337 def is_connected(self): 1338 """Returns the current status of the connection 1339 1340 True if connection exists 1341 False if connection is closed 1342 """ 1343 return self._state == mqtt_cs_connected 1344 1345 def disconnect(self, reasoncode=None, properties=None): 1346 """Disconnect a connected client from the broker. 1347 reasoncode: (MQTT v5.0 only) a ReasonCodes instance setting the MQTT v5.0 1348 reasoncode to be sent with the disconnect. It is optional, the receiver 1349 then assuming that 0 (success) is the value. 1350 properties: (MQTT v5.0 only) a Properties instance setting the MQTT v5.0 properties 1351 to be included. Optional - if not set, no properties are sent. 1352 """ 1353 self._state = mqtt_cs_disconnecting 1354 1355 if self._sock is None: 1356 return MQTT_ERR_NO_CONN 1357 1358 return self._send_disconnect(reasoncode, properties) 1359 1360 def subscribe(self, topic, qos=0, options=None, properties=None): 1361 """Subscribe the client to one or more topics. 1362 1363 This function may be called in three different ways (and a further three for MQTT v5.0): 1364 1365 Simple string and integer 1366 ------------------------- 1367 e.g. subscribe("my/topic", 2) 1368 1369 topic: A string specifying the subscription topic to subscribe to. 1370 qos: The desired quality of service level for the subscription. 1371 Defaults to 0. 1372 options and properties: Not used. 1373 1374 Simple string and subscribe options (MQTT v5.0 only) 1375 ---------------------------------------------------- 1376 e.g. subscribe("my/topic", options=SubscribeOptions(qos=2)) 1377 1378 topic: A string specifying the subscription topic to subscribe to. 1379 qos: Not used. 1380 options: The MQTT v5.0 subscribe options. 1381 properties: a Properties instance setting the MQTT v5.0 properties 1382 to be included. Optional - if not set, no properties are sent. 1383 1384 String and integer tuple 1385 ------------------------ 1386 e.g. subscribe(("my/topic", 1)) 1387 1388 topic: A tuple of (topic, qos). Both topic and qos must be present in 1389 the tuple. 1390 qos and options: Not used. 1391 properties: Only used for MQTT v5.0. A Properties instance setting the 1392 MQTT v5.0 properties. Optional - if not set, no properties are sent. 1393 1394 String and subscribe options tuple (MQTT v5.0 only) 1395 --------------------------------------------------- 1396 e.g. subscribe(("my/topic", SubscribeOptions(qos=1))) 1397 1398 topic: A tuple of (topic, SubscribeOptions). Both topic and subscribe 1399 options must be present in the tuple. 1400 qos and options: Not used. 1401 properties: a Properties instance setting the MQTT v5.0 properties 1402 to be included. Optional - if not set, no properties are sent. 1403 1404 List of string and integer tuples 1405 --------------------------------- 1406 e.g. subscribe([("my/topic", 0), ("another/topic", 2)]) 1407 1408 This allows multiple topic subscriptions in a single SUBSCRIPTION 1409 command, which is more efficient than using multiple calls to 1410 subscribe(). 1411 1412 topic: A list of tuple of format (topic, qos). Both topic and qos must 1413 be present in all of the tuples. 1414 qos, options and properties: Not used. 1415 1416 List of string and subscribe option tuples (MQTT v5.0 only) 1417 ----------------------------------------------------------- 1418 e.g. subscribe([("my/topic", SubscribeOptions(qos=0), ("another/topic", SubscribeOptions(qos=2)]) 1419 1420 This allows multiple topic subscriptions in a single SUBSCRIPTION 1421 command, which is more efficient than using multiple calls to 1422 subscribe(). 1423 1424 topic: A list of tuple of format (topic, SubscribeOptions). Both topic and subscribe 1425 options must be present in all of the tuples. 1426 qos and options: Not used. 1427 properties: a Properties instance setting the MQTT v5.0 properties 1428 to be included. Optional - if not set, no properties are sent. 1429 1430 The function returns a tuple (result, mid), where result is 1431 MQTT_ERR_SUCCESS to indicate success or (MQTT_ERR_NO_CONN, None) if the 1432 client is not currently connected. mid is the message ID for the 1433 subscribe request. The mid value can be used to track the subscribe 1434 request by checking against the mid argument in the on_subscribe() 1435 callback if it is defined. 1436 1437 Raises a ValueError if qos is not 0, 1 or 2, or if topic is None or has 1438 zero string length, or if topic is not a string, tuple or list. 1439 """ 1440 topic_qos_list = None 1441 1442 if isinstance(topic, tuple): 1443 if self._protocol == MQTTv5: 1444 topic, options = topic 1445 if not isinstance(options, SubscribeOptions): 1446 raise ValueError( 1447 'Subscribe options must be instance of SubscribeOptions class.') 1448 else: 1449 topic, qos = topic 1450 1451 if isinstance(topic, basestring): 1452 if qos < 0 or qos > 2: 1453 raise ValueError('Invalid QoS level.') 1454 if self._protocol == MQTTv5: 1455 if options is None: 1456 # if no options are provided, use the QoS passed instead 1457 options = SubscribeOptions(qos=qos) 1458 elif qos != 0: 1459 raise ValueError( 1460 'Subscribe options and qos parameters cannot be combined.') 1461 if not isinstance(options, SubscribeOptions): 1462 raise ValueError( 1463 'Subscribe options must be instance of SubscribeOptions class.') 1464 topic_qos_list = [(topic.encode('utf-8'), options)] 1465 else: 1466 if topic is None or len(topic) == 0: 1467 raise ValueError('Invalid topic.') 1468 topic_qos_list = [(topic.encode('utf-8'), qos)] 1469 elif isinstance(topic, list): 1470 topic_qos_list = [] 1471 if self._protocol == MQTTv5: 1472 for t, o in topic: 1473 if not isinstance(o, SubscribeOptions): 1474 # then the second value should be QoS 1475 if o < 0 or o > 2: 1476 raise ValueError('Invalid QoS level.') 1477 o = SubscribeOptions(qos=o) 1478 topic_qos_list.append((t.encode('utf-8'), o)) 1479 else: 1480 for t, q in topic: 1481 if q < 0 or q > 2: 1482 raise ValueError('Invalid QoS level.') 1483 if t is None or len(t) == 0 or not isinstance(t, basestring): 1484 raise ValueError('Invalid topic.') 1485 topic_qos_list.append((t.encode('utf-8'), q)) 1486 1487 if topic_qos_list is None: 1488 raise ValueError("No topic specified, or incorrect topic type.") 1489 1490 if any(self._filter_wildcard_len_check(topic) != MQTT_ERR_SUCCESS for topic, _ in topic_qos_list): 1491 raise ValueError('Invalid subscription filter.') 1492 1493 if self._sock is None: 1494 return (MQTT_ERR_NO_CONN, None) 1495 1496 return self._send_subscribe(False, topic_qos_list, properties) 1497 1498 def unsubscribe(self, topic, properties=None): 1499 """Unsubscribe the client from one or more topics. 1500 1501 topic: A single string, or list of strings that are the subscription 1502 topics to unsubscribe from. 1503 properties: (MQTT v5.0 only) a Properties instance setting the MQTT v5.0 properties 1504 to be included. Optional - if not set, no properties are sent. 1505 1506 Returns a tuple (result, mid), where result is MQTT_ERR_SUCCESS 1507 to indicate success or (MQTT_ERR_NO_CONN, None) if the client is not 1508 currently connected. 1509 mid is the message ID for the unsubscribe request. The mid value can be 1510 used to track the unsubscribe request by checking against the mid 1511 argument in the on_unsubscribe() callback if it is defined. 1512 1513 Raises a ValueError if topic is None or has zero string length, or is 1514 not a string or list. 1515 """ 1516 topic_list = None 1517 if topic is None: 1518 raise ValueError('Invalid topic.') 1519 if isinstance(topic, basestring): 1520 if len(topic) == 0: 1521 raise ValueError('Invalid topic.') 1522 topic_list = [topic.encode('utf-8')] 1523 elif isinstance(topic, list): 1524 topic_list = [] 1525 for t in topic: 1526 if len(t) == 0 or not isinstance(t, basestring): 1527 raise ValueError('Invalid topic.') 1528 topic_list.append(t.encode('utf-8')) 1529 1530 if topic_list is None: 1531 raise ValueError("No topic specified, or incorrect topic type.") 1532 1533 if self._sock is None: 1534 return (MQTT_ERR_NO_CONN, None) 1535 1536 return self._send_unsubscribe(False, topic_list, properties) 1537 1538 def loop_read(self, max_packets=1): 1539 """Process read network events. Use in place of calling loop() if you 1540 wish to handle your client reads as part of your own application. 1541 1542 Use socket() to obtain the client socket to call select() or equivalent 1543 on. 1544 1545 Do not use if you are using the threaded interface loop_start().""" 1546 if self._sock is None: 1547 return MQTT_ERR_NO_CONN 1548 1549 max_packets = len(self._out_messages) + len(self._in_messages) 1550 if max_packets < 1: 1551 max_packets = 1 1552 1553 for _ in range(0, max_packets): 1554 if self._sock is None: 1555 return MQTT_ERR_NO_CONN 1556 rc = self._packet_read() 1557 if rc > 0: 1558 return self._loop_rc_handle(rc) 1559 elif rc == MQTT_ERR_AGAIN: 1560 return MQTT_ERR_SUCCESS 1561 return MQTT_ERR_SUCCESS 1562 1563 def loop_write(self, max_packets=1): 1564 """Process write network events. Use in place of calling loop() if you 1565 wish to handle your client writes as part of your own application. 1566 1567 Use socket() to obtain the client socket to call select() or equivalent 1568 on. 1569 1570 Use want_write() to determine if there is data waiting to be written. 1571 1572 Do not use if you are using the threaded interface loop_start().""" 1573 if self._sock is None: 1574 return MQTT_ERR_NO_CONN 1575 1576 try: 1577 rc = self._packet_write() 1578 if rc == MQTT_ERR_AGAIN: 1579 return MQTT_ERR_SUCCESS 1580 elif rc > 0: 1581 return self._loop_rc_handle(rc) 1582 else: 1583 return MQTT_ERR_SUCCESS 1584 finally: 1585 if self.want_write(): 1586 self._call_socket_register_write() 1587 else: 1588 self._call_socket_unregister_write() 1589 1590 def want_write(self): 1591 """Call to determine if there is network data waiting to be written. 1592 Useful if you are calling select() yourself rather than using loop(). 1593 """ 1594 try: 1595 packet = self._out_packet.popleft() 1596 self._out_packet.appendleft(packet) 1597 return True 1598 except IndexError: 1599 return False 1600 1601 def loop_misc(self): 1602 """Process miscellaneous network events. Use in place of calling loop() if you 1603 wish to call select() or equivalent on. 1604 1605 Do not use if you are using the threaded interface loop_start().""" 1606 if self._sock is None: 1607 return MQTT_ERR_NO_CONN 1608 1609 now = time_func() 1610 self._check_keepalive() 1611 1612 if self._ping_t > 0 and now - self._ping_t >= self._keepalive: 1613 # client->ping_t != 0 means we are waiting for a pingresp. 1614 # This hasn't happened in the keepalive time so we should disconnect. 1615 self._sock_close() 1616 1617 if self._state == mqtt_cs_disconnecting: 1618 rc = MQTT_ERR_SUCCESS 1619 else: 1620 rc = MQTT_ERR_KEEPALIVE 1621 1622 self._do_on_disconnect(rc) 1623 1624 return MQTT_ERR_CONN_LOST 1625 1626 return MQTT_ERR_SUCCESS 1627 1628 def max_inflight_messages_set(self, inflight): 1629 """Set the maximum number of messages with QoS>0 that can be part way 1630 through their network flow at once. Defaults to 20.""" 1631 if inflight < 0: 1632 raise ValueError('Invalid inflight.') 1633 self._max_inflight_messages = inflight 1634 1635 def max_queued_messages_set(self, queue_size): 1636 """Set the maximum number of messages in the outgoing message queue. 1637 0 means unlimited.""" 1638 if queue_size < 0: 1639 raise ValueError('Invalid queue size.') 1640 if not isinstance(queue_size, int): 1641 raise ValueError('Invalid type of queue size.') 1642 self._max_queued_messages = queue_size 1643 return self 1644 1645 def message_retry_set(self, retry): 1646 """No longer used, remove in version 2.0""" 1647 pass 1648 1649 def user_data_set(self, userdata): 1650 """Set the user data variable passed to callbacks. May be any data type.""" 1651 self._userdata = userdata 1652 1653 def will_set(self, topic, payload=None, qos=0, retain=False, properties=None): 1654 """Set a Will to be sent by the broker in case the client disconnects unexpectedly. 1655 1656 This must be called before connect() to have any effect. 1657 1658 topic: The topic that the will message should be published on. 1659 payload: The message to send as a will. If not given, or set to None a 1660 zero length message will be used as the will. Passing an int or float 1661 will result in the payload being converted to a string representing 1662 that number. If you wish to send a true int/float, use struct.pack() to 1663 create the payload you require. 1664 qos: The quality of service level to use for the will. 1665 retain: If set to true, the will message will be set as the "last known 1666 good"/retained message for the topic. 1667 properties: (MQTT v5.0 only) a Properties instance setting the MQTT v5.0 properties 1668 to be included with the will message. Optional - if not set, no properties are sent. 1669 1670 Raises a ValueError if qos is not 0, 1 or 2, or if topic is None or has 1671 zero string length. 1672 """ 1673 if topic is None or len(topic) == 0: 1674 raise ValueError('Invalid topic.') 1675 1676 if qos < 0 or qos > 2: 1677 raise ValueError('Invalid QoS level.') 1678 1679 if properties != None and not isinstance(properties, Properties): 1680 raise ValueError( 1681 "The properties argument must be an instance of the Properties class.") 1682 1683 if isinstance(payload, unicode): 1684 self._will_payload = payload.encode('utf-8') 1685 elif isinstance(payload, (bytes, bytearray)): 1686 self._will_payload = payload 1687 elif isinstance(payload, (int, float)): 1688 self._will_payload = str(payload).encode('ascii') 1689 elif payload is None: 1690 self._will_payload = b"" 1691 else: 1692 raise TypeError( 1693 'payload must be a string, bytearray, int, float or None.') 1694 1695 self._will = True 1696 self._will_topic = topic.encode('utf-8') 1697 self._will_qos = qos 1698 self._will_retain = retain 1699 self._will_properties = properties 1700 1701 def will_clear(self): 1702 """ Removes a will that was previously configured with will_set(). 1703 1704 Must be called before connect() to have any effect.""" 1705 self._will = False 1706 self._will_topic = b"" 1707 self._will_payload = b"" 1708 self._will_qos = 0 1709 self._will_retain = False 1710 1711 def socket(self): 1712 """Return the socket or ssl object for this client.""" 1713 return self._sock 1714 1715 def loop_forever(self, timeout=1.0, max_packets=1, retry_first_connection=False): 1716 """This function calls the network loop functions for you in an 1717 infinite blocking loop. It is useful for the case where you only want 1718 to run the MQTT client loop in your program. 1719 1720 loop_forever() will handle reconnecting for you if reconnect_on_failure is 1721 true (this is the default behavior). If you call disconnect() in a callback 1722 it will return. 1723 1724 1725 timeout: The time in seconds to wait for incoming/outgoing network 1726 traffic before timing out and returning. 1727 max_packets: Not currently used. 1728 retry_first_connection: Should the first connection attempt be retried on failure. 1729 This is independent of the reconnect_on_failure setting. 1730 1731 Raises OSError/WebsocketConnectionError on first connection failures unless retry_first_connection=True 1732 """ 1733 1734 run = True 1735 1736 while run: 1737 if self._thread_terminate is True: 1738 break 1739 1740 if self._state == mqtt_cs_connect_async: 1741 try: 1742 self.reconnect() 1743 except (OSError, WebsocketConnectionError): 1744 self._handle_on_connect_fail() 1745 if not retry_first_connection: 1746 raise 1747 self._easy_log( 1748 MQTT_LOG_DEBUG, "Connection failed, retrying") 1749 self._reconnect_wait() 1750 else: 1751 break 1752 1753 while run: 1754 rc = MQTT_ERR_SUCCESS 1755 while rc == MQTT_ERR_SUCCESS: 1756 rc = self._loop(timeout) 1757 # We don't need to worry about locking here, because we've 1758 # either called loop_forever() when in single threaded mode, or 1759 # in multi threaded mode when loop_stop() has been called and 1760 # so no other threads can access _out_packet or _messages. 1761 if (self._thread_terminate is True 1762 and len(self._out_packet) == 0 1763 and len(self._out_messages) == 0): 1764 rc = 1 1765 run = False 1766 1767 def should_exit(): 1768 return self._state == mqtt_cs_disconnecting or run is False or self._thread_terminate is True 1769 1770 if should_exit() or not self._reconnect_on_failure: 1771 run = False 1772 else: 1773 self._reconnect_wait() 1774 1775 if should_exit(): 1776 run = False 1777 else: 1778 try: 1779 self.reconnect() 1780 except (OSError, WebsocketConnectionError): 1781 self._handle_on_connect_fail() 1782 self._easy_log( 1783 MQTT_LOG_DEBUG, "Connection failed, retrying") 1784 1785 return rc 1786 1787 def loop_start(self): 1788 """This is part of the threaded client interface. Call this once to 1789 start a new thread to process network traffic. This provides an 1790 alternative to repeatedly calling loop() yourself. 1791 """ 1792 if self._thread is not None: 1793 return MQTT_ERR_INVAL 1794 1795 self._sockpairR, self._sockpairW = _socketpair_compat() 1796 self._thread_terminate = False 1797 self._thread = threading.Thread(target=self._thread_main) 1798 self._thread.daemon = True 1799 self._thread.start() 1800 1801 def loop_stop(self, force=False): 1802 """This is part of the threaded client interface. Call this once to 1803 stop the network thread previously created with loop_start(). This call 1804 will block until the network thread finishes. 1805 1806 The force parameter is currently ignored. 1807 """ 1808 if self._thread is None: 1809 return MQTT_ERR_INVAL 1810 1811 self._thread_terminate = True 1812 if threading.current_thread() != self._thread: 1813 self._thread.join() 1814 self._thread = None 1815 1816 @property 1817 def on_log(self): 1818 """If implemented, called when the client has log information. 1819 Defined to allow debugging.""" 1820 return self._on_log 1821 1822 @on_log.setter 1823 def on_log(self, func): 1824 """ Define the logging callback implementation. 1825 1826 Expected signature is: 1827 log_callback(client, userdata, level, buf) 1828 1829 client: the client instance for this callback 1830 userdata: the private user data as set in Client() or userdata_set() 1831 level: gives the severity of the message and will be one of 1832 MQTT_LOG_INFO, MQTT_LOG_NOTICE, MQTT_LOG_WARNING, 1833 MQTT_LOG_ERR, and MQTT_LOG_DEBUG. 1834 buf: the message itself 1835 1836 Decorator: @client.log_callback() (```client``` is the name of the 1837 instance which this callback is being attached to) 1838 """ 1839 self._on_log = func 1840 1841 def log_callback(self): 1842 def decorator(func): 1843 self.on_log = func 1844 return func 1845 return decorator 1846 1847 @property 1848 def on_connect(self): 1849 """If implemented, called when the broker responds to our connection 1850 request.""" 1851 return self._on_connect 1852 1853 @on_connect.setter 1854 def on_connect(self, func): 1855 """ Define the connect callback implementation. 1856 1857 Expected signature for MQTT v3.1 and v3.1.1 is: 1858 connect_callback(client, userdata, flags, rc) 1859 1860 and for MQTT v5.0: 1861 connect_callback(client, userdata, flags, reasonCode, properties) 1862 1863 client: the client instance for this callback 1864 userdata: the private user data as set in Client() or userdata_set() 1865 flags: response flags sent by the broker 1866 rc: the connection result 1867 reasonCode: the MQTT v5.0 reason code: an instance of the ReasonCode class. 1868 ReasonCode may be compared to integer. 1869 properties: the MQTT v5.0 properties returned from the broker. An instance 1870 of the Properties class. 1871 For MQTT v3.1 and v3.1.1 properties is not provided but for compatibility 1872 with MQTT v5.0, we recommend adding properties=None. 1873 1874 flags is a dict that contains response flags from the broker: 1875 flags['session present'] - this flag is useful for clients that are 1876 using clean session set to 0 only. If a client with clean 1877 session=0, that reconnects to a broker that it has previously 1878 connected to, this flag indicates whether the broker still has the 1879 session information for the client. If 1, the session still exists. 1880 1881 The value of rc indicates success or not: 1882 0: Connection successful 1883 1: Connection refused - incorrect protocol version 1884 2: Connection refused - invalid client identifier 1885 3: Connection refused - server unavailable 1886 4: Connection refused - bad username or password 1887 5: Connection refused - not authorised 1888 6-255: Currently unused. 1889 1890 Decorator: @client.connect_callback() (```client``` is the name of the 1891 instance which this callback is being attached to) 1892 1893 """ 1894 with self._callback_mutex: 1895 self._on_connect = func 1896 1897 def connect_callback(self): 1898 def decorator(func): 1899 self.on_connect = func 1900 return func 1901 return decorator 1902 1903 @property 1904 def on_connect_fail(self): 1905 """If implemented, called when the client failed to connect 1906 to the broker.""" 1907 return self._on_connect_fail 1908 1909 @on_connect_fail.setter 1910 def on_connect_fail(self, func): 1911 """ Define the connection failure callback implementation 1912 1913 Expected signature is: 1914 on_connect_fail(client, userdata) 1915 1916 client: the client instance for this callback 1917 userdata: the private user data as set in Client() or userdata_set() 1918 1919 Decorator: @client.connect_fail_callback() (```client``` is the name of the 1920 instance which this callback is being attached to) 1921 1922 """ 1923 with self._callback_mutex: 1924 self._on_connect_fail = func 1925 1926 def connect_fail_callback(self): 1927 def decorator(func): 1928 self.on_connect_fail = func 1929 return func 1930 return decorator 1931 1932 @property 1933 def on_subscribe(self): 1934 """If implemented, called when the broker responds to a subscribe 1935 request.""" 1936 return self._on_subscribe 1937 1938 @on_subscribe.setter 1939 def on_subscribe(self, func): 1940 """ Define the subscribe callback implementation. 1941 1942 Expected signature for MQTT v3.1.1 and v3.1 is: 1943 subscribe_callback(client, userdata, mid, granted_qos) 1944 1945 and for MQTT v5.0: 1946 subscribe_callback(client, userdata, mid, reasonCodes, properties) 1947 1948 client: the client instance for this callback 1949 userdata: the private user data as set in Client() or userdata_set() 1950 mid: matches the mid variable returned from the corresponding 1951 subscribe() call. 1952 granted_qos: list of integers that give the QoS level the broker has 1953 granted for each of the different subscription requests. 1954 reasonCodes: the MQTT v5.0 reason codes received from the broker for each 1955 subscription. A list of ReasonCodes instances. 1956 properties: the MQTT v5.0 properties received from the broker. A 1957 list of Properties class instances. 1958 1959 Decorator: @client.subscribe_callback() (```client``` is the name of the 1960 instance which this callback is being attached to) 1961 """ 1962 with self._callback_mutex: 1963 self._on_subscribe = func 1964 1965 def subscribe_callback(self): 1966 def decorator(func): 1967 self.on_subscribe = func 1968 return func 1969 return decorator 1970 1971 @property 1972 def on_message(self): 1973 """If implemented, called when a message has been received on a topic 1974 that the client subscribes to. 1975 1976 This callback will be called for every message received. Use 1977 message_callback_add() to define multiple callbacks that will be called 1978 for specific topic filters.""" 1979 return self._on_message 1980 1981 @on_message.setter 1982 def on_message(self, func): 1983 """ Define the message received callback implementation. 1984 1985 Expected signature is: 1986 on_message_callback(client, userdata, message) 1987 1988 client: the client instance for this callback 1989 userdata: the private user data as set in Client() or userdata_set() 1990 message: an instance of MQTTMessage. 1991 This is a class with members topic, payload, qos, retain. 1992 1993 Decorator: @client.message_callback() (```client``` is the name of the 1994 instance which this callback is being attached to) 1995 1996 """ 1997 with self._callback_mutex: 1998 self._on_message = func 1999 2000 def message_callback(self): 2001 def decorator(func): 2002 self.on_message = func 2003 return func 2004 return decorator 2005 2006 @property 2007 def on_publish(self): 2008 """If implemented, called when a message that was to be sent using the 2009 publish() call has completed transmission to the broker. 2010 2011 For messages with QoS levels 1 and 2, this means that the appropriate 2012 handshakes have completed. For QoS 0, this simply means that the message 2013 has left the client. 2014 This callback is important because even if the publish() call returns 2015 success, it does not always mean that the message has been sent.""" 2016 return self._on_publish 2017 2018 @on_publish.setter 2019 def on_publish(self, func): 2020 """ Define the published message callback implementation. 2021 2022 Expected signature is: 2023 on_publish_callback(client, userdata, mid) 2024 2025 client: the client instance for this callback 2026 userdata: the private user data as set in Client() or userdata_set() 2027 mid: matches the mid variable returned from the corresponding 2028 publish() call, to allow outgoing messages to be tracked. 2029 2030 Decorator: @client.publish_callback() (```client``` is the name of the 2031 instance which this callback is being attached to) 2032 2033 """ 2034 with self._callback_mutex: 2035 self._on_publish = func 2036 2037 def publish_callback(self): 2038 def decorator(func): 2039 self.on_publish = func 2040 return func 2041 return decorator 2042 2043 @property 2044 def on_unsubscribe(self): 2045 """If implemented, called when the broker responds to an unsubscribe 2046 request.""" 2047 return self._on_unsubscribe 2048 2049 @on_unsubscribe.setter 2050 def on_unsubscribe(self, func): 2051 """ Define the unsubscribe callback implementation. 2052 2053 Expected signature for MQTT v3.1.1 and v3.1 is: 2054 unsubscribe_callback(client, userdata, mid) 2055 2056 and for MQTT v5.0: 2057 unsubscribe_callback(client, userdata, mid, properties, reasonCodes) 2058 2059 client: the client instance for this callback 2060 userdata: the private user data as set in Client() or userdata_set() 2061 mid: matches the mid variable returned from the corresponding 2062 unsubscribe() call. 2063 properties: the MQTT v5.0 properties received from the broker. A 2064 list of Properties class instances. 2065 reasonCodes: the MQTT v5.0 reason codes received from the broker for each 2066 unsubscribe topic. A list of ReasonCodes instances 2067 2068 Decorator: @client.unsubscribe_callback() (```client``` is the name of the 2069 instance which this callback is being attached to) 2070 """ 2071 with self._callback_mutex: 2072 self._on_unsubscribe = func 2073 2074 def unsubscribe_callback(self): 2075 def decorator(func): 2076 self.on_unsubscribe = func 2077 return func 2078 return decorator 2079 2080 @property 2081 def on_disconnect(self): 2082 """If implemented, called when the client disconnects from the broker. 2083 """ 2084 return self._on_disconnect 2085 2086 @on_disconnect.setter 2087 def on_disconnect(self, func): 2088 """ Define the disconnect callback implementation. 2089 2090 Expected signature for MQTT v3.1.1 and v3.1 is: 2091 disconnect_callback(client, userdata, rc) 2092 2093 and for MQTT v5.0: 2094 disconnect_callback(client, userdata, reasonCode, properties) 2095 2096 client: the client instance for this callback 2097 userdata: the private user data as set in Client() or userdata_set() 2098 rc: the disconnection result 2099 The rc parameter indicates the disconnection state. If 2100 MQTT_ERR_SUCCESS (0), the callback was called in response to 2101 a disconnect() call. If any other value the disconnection 2102 was unexpected, such as might be caused by a network error. 2103 2104 Decorator: @client.disconnect_callback() (```client``` is the name of the 2105 instance which this callback is being attached to) 2106 2107 """ 2108 with self._callback_mutex: 2109 self._on_disconnect = func 2110 2111 def disconnect_callback(self): 2112 def decorator(func): 2113 self.on_disconnect = func 2114 return func 2115 return decorator 2116 2117 @property 2118 def on_socket_open(self): 2119 """If implemented, called just after the socket was opend.""" 2120 return self._on_socket_open 2121 2122 @on_socket_open.setter 2123 def on_socket_open(self, func): 2124 """Define the socket_open callback implementation. 2125 2126 This should be used to register the socket to an external event loop for reading. 2127 2128 Expected signature is: 2129 socket_open_callback(client, userdata, socket) 2130 2131 client: the client instance for this callback 2132 userdata: the private user data as set in Client() or userdata_set() 2133 sock: the socket which was just opened. 2134 2135 Decorator: @client.socket_open_callback() (```client``` is the name of the 2136 instance which this callback is being attached to) 2137 """ 2138 with self._callback_mutex: 2139 self._on_socket_open = func 2140 2141 def socket_open_callback(self): 2142 def decorator(func): 2143 self.on_socket_open = func 2144 return func 2145 return decorator 2146 2147 def _call_socket_open(self): 2148 """Call the socket_open callback with the just-opened socket""" 2149 with self._callback_mutex: 2150 on_socket_open = self.on_socket_open 2151 2152 if on_socket_open: 2153 with self._in_callback_mutex: 2154 try: 2155 on_socket_open(self, self._userdata, self._sock) 2156 except Exception as err: 2157 self._easy_log( 2158 MQTT_LOG_ERR, 'Caught exception in on_socket_open: %s', err) 2159 if not self.suppress_exceptions: 2160 raise 2161 2162 @property 2163 def on_socket_close(self): 2164 """If implemented, called just before the socket is closed.""" 2165 return self._on_socket_close 2166 2167 @on_socket_close.setter 2168 def on_socket_close(self, func): 2169 """Define the socket_close callback implementation. 2170 2171 This should be used to unregister the socket from an external event loop for reading. 2172 2173 Expected signature is: 2174 socket_close_callback(client, userdata, socket) 2175 2176 client: the client instance for this callback 2177 userdata: the private user data as set in Client() or userdata_set() 2178 sock: the socket which is about to be closed. 2179 2180 Decorator: @client.socket_close_callback() (```client``` is the name of the 2181 instance which this callback is being attached to) 2182 """ 2183 with self._callback_mutex: 2184 self._on_socket_close = func 2185 2186 def socket_close_callback(self): 2187 def decorator(func): 2188 self.on_socket_close = func 2189 return func 2190 return decorator 2191 2192 def _call_socket_close(self, sock): 2193 """Call the socket_close callback with the about-to-be-closed socket""" 2194 with self._callback_mutex: 2195 on_socket_close = self.on_socket_close 2196 2197 if on_socket_close: 2198 with self._in_callback_mutex: 2199 try: 2200 on_socket_close(self, self._userdata, sock) 2201 except Exception as err: 2202 self._easy_log( 2203 MQTT_LOG_ERR, 'Caught exception in on_socket_close: %s', err) 2204 if not self.suppress_exceptions: 2205 raise 2206 2207 @property 2208 def on_socket_register_write(self): 2209 """If implemented, called when the socket needs writing but can't.""" 2210 return self._on_socket_register_write 2211 2212 @on_socket_register_write.setter 2213 def on_socket_register_write(self, func): 2214 """Define the socket_register_write callback implementation. 2215 2216 This should be used to register the socket with an external event loop for writing. 2217 2218 Expected signature is: 2219 socket_register_write_callback(client, userdata, socket) 2220 2221 client: the client instance for this callback 2222 userdata: the private user data as set in Client() or userdata_set() 2223 sock: the socket which should be registered for writing 2224 2225 Decorator: @client.socket_register_write_callback() (```client``` is the name of the 2226 instance which this callback is being attached to) 2227 """ 2228 with self._callback_mutex: 2229 self._on_socket_register_write = func 2230 2231 def socket_register_write_callback(self): 2232 def decorator(func): 2233 self._on_socket_register_write = func 2234 return func 2235 return decorator 2236 2237 def _call_socket_register_write(self): 2238 """Call the socket_register_write callback with the unwritable socket""" 2239 if not self._sock or self._registered_write: 2240 return 2241 self._registered_write = True 2242 with self._callback_mutex: 2243 on_socket_register_write = self.on_socket_register_write 2244 2245 if on_socket_register_write: 2246 try: 2247 on_socket_register_write( 2248 self, self._userdata, self._sock) 2249 except Exception as err: 2250 self._easy_log( 2251 MQTT_LOG_ERR, 'Caught exception in on_socket_register_write: %s', err) 2252 if not self.suppress_exceptions: 2253 raise 2254 2255 @property 2256 def on_socket_unregister_write(self): 2257 """If implemented, called when the socket doesn't need writing anymore.""" 2258 return self._on_socket_unregister_write 2259 2260 @on_socket_unregister_write.setter 2261 def on_socket_unregister_write(self, func): 2262 """Define the socket_unregister_write callback implementation. 2263 2264 This should be used to unregister the socket from an external event loop for writing. 2265 2266 Expected signature is: 2267 socket_unregister_write_callback(client, userdata, socket) 2268 2269 client: the client instance for this callback 2270 userdata: the private user data as set in Client() or userdata_set() 2271 sock: the socket which should be unregistered for writing 2272 2273 Decorator: @client.socket_unregister_write_callback() (```client``` is the name of the 2274 instance which this callback is being attached to) 2275 """ 2276 with self._callback_mutex: 2277 self._on_socket_unregister_write = func 2278 2279 def socket_unregister_write_callback(self): 2280 def decorator(func): 2281 self._on_socket_unregister_write = func 2282 return func 2283 return decorator 2284 2285 def _call_socket_unregister_write(self, sock=None): 2286 """Call the socket_unregister_write callback with the writable socket""" 2287 sock = sock or self._sock 2288 if not sock or not self._registered_write: 2289 return 2290 self._registered_write = False 2291 2292 with self._callback_mutex: 2293 on_socket_unregister_write = self.on_socket_unregister_write 2294 2295 if on_socket_unregister_write: 2296 try: 2297 on_socket_unregister_write(self, self._userdata, sock) 2298 except Exception as err: 2299 self._easy_log( 2300 MQTT_LOG_ERR, 'Caught exception in on_socket_unregister_write: %s', err) 2301 if not self.suppress_exceptions: 2302 raise 2303 2304 def message_callback_add(self, sub, callback): 2305 """Register a message callback for a specific topic. 2306 Messages that match 'sub' will be passed to 'callback'. Any 2307 non-matching messages will be passed to the default on_message 2308 callback. 2309 2310 Call multiple times with different 'sub' to define multiple topic 2311 specific callbacks. 2312 2313 Topic specific callbacks may be removed with 2314 message_callback_remove().""" 2315 if callback is None or sub is None: 2316 raise ValueError("sub and callback must both be defined.") 2317 2318 with self._callback_mutex: 2319 self._on_message_filtered[sub] = callback 2320 2321 def topic_callback(self, sub): 2322 def decorator(func): 2323 self.message_callback_add(sub, func) 2324 return func 2325 return decorator 2326 2327 def message_callback_remove(self, sub): 2328 """Remove a message callback previously registered with 2329 message_callback_add().""" 2330 if sub is None: 2331 raise ValueError("sub must defined.") 2332 2333 with self._callback_mutex: 2334 try: 2335 del self._on_message_filtered[sub] 2336 except KeyError: # no such subscription 2337 pass 2338 2339 # ============================================================ 2340 # Private functions 2341 # ============================================================ 2342 2343 def _loop_rc_handle(self, rc, properties=None): 2344 if rc: 2345 self._sock_close() 2346 2347 if self._state == mqtt_cs_disconnecting: 2348 rc = MQTT_ERR_SUCCESS 2349 2350 self._do_on_disconnect(rc, properties) 2351 2352 return rc 2353 2354 def _packet_read(self): 2355 # This gets called if pselect() indicates that there is network data 2356 # available - ie. at least one byte. What we do depends on what data we 2357 # already have. 2358 # If we've not got a command, attempt to read one and save it. This should 2359 # always work because it's only a single byte. 2360 # Then try to read the remaining length. This may fail because it is may 2361 # be more than one byte - will need to save data pending next read if it 2362 # does fail. 2363 # Then try to read the remaining payload, where 'payload' here means the 2364 # combined variable header and actual payload. This is the most likely to 2365 # fail due to longer length, so save current data and current position. 2366 # After all data is read, send to _mqtt_handle_packet() to deal with. 2367 # Finally, free the memory and reset everything to starting conditions. 2368 if self._in_packet['command'] == 0: 2369 try: 2370 command = self._sock_recv(1) 2371 except BlockingIOError: 2372 return MQTT_ERR_AGAIN 2373 except ConnectionError as err: 2374 self._easy_log( 2375 MQTT_LOG_ERR, 'failed to receive on socket: %s', err) 2376 return MQTT_ERR_CONN_LOST 2377 else: 2378 if len(command) == 0: 2379 return MQTT_ERR_CONN_LOST 2380 command, = struct.unpack("!B", command) 2381 self._in_packet['command'] = command 2382 2383 if self._in_packet['have_remaining'] == 0: 2384 # Read remaining 2385 # Algorithm for decoding taken from pseudo code at 2386 # http://publib.boulder.ibm.com/infocenter/wmbhelp/v6r0m0/topic/com.ibm.etools.mft.doc/ac10870_.htm 2387 while True: 2388 try: 2389 byte = self._sock_recv(1) 2390 except BlockingIOError: 2391 return MQTT_ERR_AGAIN 2392 except ConnectionError as err: 2393 self._easy_log( 2394 MQTT_LOG_ERR, 'failed to receive on socket: %s', err) 2395 return MQTT_ERR_CONN_LOST 2396 else: 2397 if len(byte) == 0: 2398 return MQTT_ERR_CONN_LOST 2399 byte, = struct.unpack("!B", byte) 2400 self._in_packet['remaining_count'].append(byte) 2401 # Max 4 bytes length for remaining length as defined by protocol. 2402 # Anything more likely means a broken/malicious client. 2403 if len(self._in_packet['remaining_count']) > 4: 2404 return MQTT_ERR_PROTOCOL 2405 2406 self._in_packet['remaining_length'] += ( 2407 byte & 127) * self._in_packet['remaining_mult'] 2408 self._in_packet['remaining_mult'] = self._in_packet['remaining_mult'] * 128 2409 2410 if (byte & 128) == 0: 2411 break 2412 2413 self._in_packet['have_remaining'] = 1 2414 self._in_packet['to_process'] = self._in_packet['remaining_length'] 2415 2416 count = 100 # Don't get stuck in this loop if we have a huge message. 2417 while self._in_packet['to_process'] > 0: 2418 try: 2419 data = self._sock_recv(self._in_packet['to_process']) 2420 except BlockingIOError: 2421 return MQTT_ERR_AGAIN 2422 except ConnectionError as err: 2423 self._easy_log( 2424 MQTT_LOG_ERR, 'failed to receive on socket: %s', err) 2425 return MQTT_ERR_CONN_LOST 2426 else: 2427 if len(data) == 0: 2428 return MQTT_ERR_CONN_LOST 2429 self._in_packet['to_process'] -= len(data) 2430 self._in_packet['packet'] += data 2431 count -= 1 2432 if count == 0: 2433 with self._msgtime_mutex: 2434 self._last_msg_in = time_func() 2435 return MQTT_ERR_AGAIN 2436 2437 # All data for this packet is read. 2438 self._in_packet['pos'] = 0 2439 rc = self._packet_handle() 2440 2441 # Free data and reset values 2442 self._in_packet = { 2443 'command': 0, 2444 'have_remaining': 0, 2445 'remaining_count': [], 2446 'remaining_mult': 1, 2447 'remaining_length': 0, 2448 'packet': bytearray(b""), 2449 'to_process': 0, 2450 'pos': 0} 2451 2452 with self._msgtime_mutex: 2453 self._last_msg_in = time_func() 2454 return rc 2455 2456 def _packet_write(self): 2457 while True: 2458 try: 2459 packet = self._out_packet.popleft() 2460 except IndexError: 2461 return MQTT_ERR_SUCCESS 2462 2463 try: 2464 write_length = self._sock_send( 2465 packet['packet'][packet['pos']:]) 2466 except (AttributeError, ValueError): 2467 self._out_packet.appendleft(packet) 2468 return MQTT_ERR_SUCCESS 2469 except BlockingIOError: 2470 self._out_packet.appendleft(packet) 2471 return MQTT_ERR_AGAIN 2472 except ConnectionError as err: 2473 self._out_packet.appendleft(packet) 2474 self._easy_log( 2475 MQTT_LOG_ERR, 'failed to receive on socket: %s', err) 2476 return MQTT_ERR_CONN_LOST 2477 2478 if write_length > 0: 2479 packet['to_process'] -= write_length 2480 packet['pos'] += write_length 2481 2482 if packet['to_process'] == 0: 2483 if (packet['command'] & 0xF0) == PUBLISH and packet['qos'] == 0: 2484 with self._callback_mutex: 2485 on_publish = self.on_publish 2486 2487 if on_publish: 2488 with self._in_callback_mutex: 2489 try: 2490 on_publish( 2491 self, self._userdata, packet['mid']) 2492 except Exception as err: 2493 self._easy_log( 2494 MQTT_LOG_ERR, 'Caught exception in on_publish: %s', err) 2495 if not self.suppress_exceptions: 2496 raise 2497 2498 packet['info']._set_as_published() 2499 2500 if (packet['command'] & 0xF0) == DISCONNECT: 2501 with self._msgtime_mutex: 2502 self._last_msg_out = time_func() 2503 2504 self._do_on_disconnect(MQTT_ERR_SUCCESS) 2505 self._sock_close() 2506 return MQTT_ERR_SUCCESS 2507 2508 else: 2509 # We haven't finished with this packet 2510 self._out_packet.appendleft(packet) 2511 else: 2512 break 2513 2514 with self._msgtime_mutex: 2515 self._last_msg_out = time_func() 2516 2517 return MQTT_ERR_SUCCESS 2518 2519 def _easy_log(self, level, fmt, *args): 2520 if self.on_log is not None: 2521 buf = fmt % args 2522 try: 2523 self.on_log(self, self._userdata, level, buf) 2524 except Exception: 2525 # Can't _easy_log this, as we'll recurse until we break 2526 pass # self._logger will pick this up, so we're fine 2527 if self._logger is not None: 2528 level_std = LOGGING_LEVEL[level] 2529 self._logger.log(level_std, fmt, *args) 2530 2531 def _check_keepalive(self): 2532 if self._keepalive == 0: 2533 return MQTT_ERR_SUCCESS 2534 2535 now = time_func() 2536 2537 with self._msgtime_mutex: 2538 last_msg_out = self._last_msg_out 2539 last_msg_in = self._last_msg_in 2540 2541 if self._sock is not None and (now - last_msg_out >= self._keepalive or now - last_msg_in >= self._keepalive): 2542 if self._state == mqtt_cs_connected and self._ping_t == 0: 2543 try: 2544 self._send_pingreq() 2545 except Exception: 2546 self._sock_close() 2547 self._do_on_disconnect(MQTT_ERR_CONN_LOST) 2548 else: 2549 with self._msgtime_mutex: 2550 self._last_msg_out = now 2551 self._last_msg_in = now 2552 else: 2553 self._sock_close() 2554 2555 if self._state == mqtt_cs_disconnecting: 2556 rc = MQTT_ERR_SUCCESS 2557 else: 2558 rc = MQTT_ERR_KEEPALIVE 2559 2560 self._do_on_disconnect(rc) 2561 2562 def _mid_generate(self): 2563 with self._mid_generate_mutex: 2564 self._last_mid += 1 2565 if self._last_mid == 65536: 2566 self._last_mid = 1 2567 return self._last_mid 2568 2569 @staticmethod 2570 def _topic_wildcard_len_check(topic): 2571 # Search for + or # in a topic. Return MQTT_ERR_INVAL if found. 2572 # Also returns MQTT_ERR_INVAL if the topic string is too long. 2573 # Returns MQTT_ERR_SUCCESS if everything is fine. 2574 if b'+' in topic or b'#' in topic or len(topic) > 65535: 2575 return MQTT_ERR_INVAL 2576 else: 2577 return MQTT_ERR_SUCCESS 2578 2579 @staticmethod 2580 def _filter_wildcard_len_check(sub): 2581 if (len(sub) == 0 or len(sub) > 65535 2582 or any(b'+' in p or b'#' in p for p in sub.split(b'/') if len(p) > 1) 2583 or b'#/' in sub): 2584 return MQTT_ERR_INVAL 2585 else: 2586 return MQTT_ERR_SUCCESS 2587 2588 def _send_pingreq(self): 2589 self._easy_log(MQTT_LOG_DEBUG, "Sending PINGREQ") 2590 rc = self._send_simple_command(PINGREQ) 2591 if rc == MQTT_ERR_SUCCESS: 2592 self._ping_t = time_func() 2593 return rc 2594 2595 def _send_pingresp(self): 2596 self._easy_log(MQTT_LOG_DEBUG, "Sending PINGRESP") 2597 return self._send_simple_command(PINGRESP) 2598 2599 def _send_puback(self, mid): 2600 self._easy_log(MQTT_LOG_DEBUG, "Sending PUBACK (Mid: %d)", mid) 2601 return self._send_command_with_mid(PUBACK, mid, False) 2602 2603 def _send_pubcomp(self, mid): 2604 self._easy_log(MQTT_LOG_DEBUG, "Sending PUBCOMP (Mid: %d)", mid) 2605 return self._send_command_with_mid(PUBCOMP, mid, False) 2606 2607 def _pack_remaining_length(self, packet, remaining_length): 2608 remaining_bytes = [] 2609 while True: 2610 byte = remaining_length % 128 2611 remaining_length = remaining_length // 128 2612 # If there are more digits to encode, set the top bit of this digit 2613 if remaining_length > 0: 2614 byte |= 0x80 2615 2616 remaining_bytes.append(byte) 2617 packet.append(byte) 2618 if remaining_length == 0: 2619 # FIXME - this doesn't deal with incorrectly large payloads 2620 return packet 2621 2622 def _pack_str16(self, packet, data): 2623 if isinstance(data, unicode): 2624 data = data.encode('utf-8') 2625 packet.extend(struct.pack("!H", len(data))) 2626 packet.extend(data) 2627 2628 def _send_publish(self, mid, topic, payload=b'', qos=0, retain=False, dup=False, info=None, properties=None): 2629 # we assume that topic and payload are already properly encoded 2630 assert not isinstance(topic, unicode) and not isinstance( 2631 payload, unicode) and payload is not None 2632 2633 if self._sock is None: 2634 return MQTT_ERR_NO_CONN 2635 2636 command = PUBLISH | ((dup & 0x1) << 3) | (qos << 1) | retain 2637 packet = bytearray() 2638 packet.append(command) 2639 2640 payloadlen = len(payload) 2641 remaining_length = 2 + len(topic) + payloadlen 2642 2643 if payloadlen == 0: 2644 if self._protocol == MQTTv5: 2645 self._easy_log( 2646 MQTT_LOG_DEBUG, 2647 "Sending PUBLISH (d%d, q%d, r%d, m%d), '%s', properties=%s (NULL payload)", 2648 dup, qos, retain, mid, topic, properties 2649 ) 2650 else: 2651 self._easy_log( 2652 MQTT_LOG_DEBUG, 2653 "Sending PUBLISH (d%d, q%d, r%d, m%d), '%s' (NULL payload)", 2654 dup, qos, retain, mid, topic 2655 ) 2656 else: 2657 if self._protocol == MQTTv5: 2658 self._easy_log( 2659 MQTT_LOG_DEBUG, 2660 "Sending PUBLISH (d%d, q%d, r%d, m%d), '%s', properties=%s, ... (%d bytes)", 2661 dup, qos, retain, mid, topic, properties, payloadlen 2662 ) 2663 else: 2664 self._easy_log( 2665 MQTT_LOG_DEBUG, 2666 "Sending PUBLISH (d%d, q%d, r%d, m%d), '%s', ... (%d bytes)", 2667 dup, qos, retain, mid, topic, payloadlen 2668 ) 2669 2670 if qos > 0: 2671 # For message id 2672 remaining_length += 2 2673 2674 if self._protocol == MQTTv5: 2675 if properties is None: 2676 packed_properties = b'\x00' 2677 else: 2678 packed_properties = properties.pack() 2679 remaining_length += len(packed_properties) 2680 2681 self._pack_remaining_length(packet, remaining_length) 2682 self._pack_str16(packet, topic) 2683 2684 if qos > 0: 2685 # For message id 2686 packet.extend(struct.pack("!H", mid)) 2687 2688 if self._protocol == MQTTv5: 2689 packet.extend(packed_properties) 2690 2691 packet.extend(payload) 2692 2693 return self._packet_queue(PUBLISH, packet, mid, qos, info) 2694 2695 def _send_pubrec(self, mid): 2696 self._easy_log(MQTT_LOG_DEBUG, "Sending PUBREC (Mid: %d)", mid) 2697 return self._send_command_with_mid(PUBREC, mid, False) 2698 2699 def _send_pubrel(self, mid): 2700 self._easy_log(MQTT_LOG_DEBUG, "Sending PUBREL (Mid: %d)", mid) 2701 return self._send_command_with_mid(PUBREL | 2, mid, False) 2702 2703 def _send_command_with_mid(self, command, mid, dup): 2704 # For PUBACK, PUBCOMP, PUBREC, and PUBREL 2705 if dup: 2706 command |= 0x8 2707 2708 remaining_length = 2 2709 packet = struct.pack('!BBH', command, remaining_length, mid) 2710 return self._packet_queue(command, packet, mid, 1) 2711 2712 def _send_simple_command(self, command): 2713 # For DISCONNECT, PINGREQ and PINGRESP 2714 remaining_length = 0 2715 packet = struct.pack('!BB', command, remaining_length) 2716 return self._packet_queue(command, packet, 0, 0) 2717 2718 def _send_connect(self, keepalive): 2719 proto_ver = self._protocol 2720 # hard-coded UTF-8 encoded string 2721 protocol = b"MQTT" if proto_ver >= MQTTv311 else b"MQIsdp" 2722 2723 remaining_length = 2 + len(protocol) + 1 + \ 2724 1 + 2 + 2 + len(self._client_id) 2725 2726 connect_flags = 0 2727 if self._protocol == MQTTv5: 2728 if self._clean_start == True: 2729 connect_flags |= 0x02 2730 elif self._clean_start == MQTT_CLEAN_START_FIRST_ONLY and self._mqttv5_first_connect: 2731 connect_flags |= 0x02 2732 elif self._clean_session: 2733 connect_flags |= 0x02 2734 2735 if self._will: 2736 remaining_length += 2 + \ 2737 len(self._will_topic) + 2 + len(self._will_payload) 2738 connect_flags |= 0x04 | ((self._will_qos & 0x03) << 3) | ( 2739 (self._will_retain & 0x01) << 5) 2740 2741 if self._username is not None: 2742 remaining_length += 2 + len(self._username) 2743 connect_flags |= 0x80 2744 if self._password is not None: 2745 connect_flags |= 0x40 2746 remaining_length += 2 + len(self._password) 2747 2748 if self._protocol == MQTTv5: 2749 if self._connect_properties is None: 2750 packed_connect_properties = b'\x00' 2751 else: 2752 packed_connect_properties = self._connect_properties.pack() 2753 remaining_length += len(packed_connect_properties) 2754 if self._will: 2755 if self._will_properties is None: 2756 packed_will_properties = b'\x00' 2757 else: 2758 packed_will_properties = self._will_properties.pack() 2759 remaining_length += len(packed_will_properties) 2760 2761 command = CONNECT 2762 packet = bytearray() 2763 packet.append(command) 2764 2765 # as per the mosquitto broker, if the MSB of this version is set 2766 # to 1, then it treats the connection as a bridge 2767 if self._client_mode == MQTT_BRIDGE: 2768 proto_ver |= 0x80 2769 2770 self._pack_remaining_length(packet, remaining_length) 2771 packet.extend(struct.pack("!H" + str(len(protocol)) + "sBBH", len(protocol), protocol, proto_ver, connect_flags, 2772 keepalive)) 2773 2774 if self._protocol == MQTTv5: 2775 packet += packed_connect_properties 2776 2777 self._pack_str16(packet, self._client_id) 2778 2779 if self._will: 2780 if self._protocol == MQTTv5: 2781 packet += packed_will_properties 2782 self._pack_str16(packet, self._will_topic) 2783 self._pack_str16(packet, self._will_payload) 2784 2785 if self._username is not None: 2786 self._pack_str16(packet, self._username) 2787 2788 if self._password is not None: 2789 self._pack_str16(packet, self._password) 2790 2791 self._keepalive = keepalive 2792 if self._protocol == MQTTv5: 2793 self._easy_log( 2794 MQTT_LOG_DEBUG, 2795 "Sending CONNECT (u%d, p%d, wr%d, wq%d, wf%d, c%d, k%d) client_id=%s properties=%s", 2796 (connect_flags & 0x80) >> 7, 2797 (connect_flags & 0x40) >> 6, 2798 (connect_flags & 0x20) >> 5, 2799 (connect_flags & 0x18) >> 3, 2800 (connect_flags & 0x4) >> 2, 2801 (connect_flags & 0x2) >> 1, 2802 keepalive, 2803 self._client_id, 2804 self._connect_properties 2805 ) 2806 else: 2807 self._easy_log( 2808 MQTT_LOG_DEBUG, 2809 "Sending CONNECT (u%d, p%d, wr%d, wq%d, wf%d, c%d, k%d) client_id=%s", 2810 (connect_flags & 0x80) >> 7, 2811 (connect_flags & 0x40) >> 6, 2812 (connect_flags & 0x20) >> 5, 2813 (connect_flags & 0x18) >> 3, 2814 (connect_flags & 0x4) >> 2, 2815 (connect_flags & 0x2) >> 1, 2816 keepalive, 2817 self._client_id 2818 ) 2819 return self._packet_queue(command, packet, 0, 0) 2820 2821 def _send_disconnect(self, reasoncode=None, properties=None): 2822 if self._protocol == MQTTv5: 2823 self._easy_log(MQTT_LOG_DEBUG, "Sending DISCONNECT reasonCode=%s properties=%s", 2824 reasoncode, 2825 properties 2826 ) 2827 else: 2828 self._easy_log(MQTT_LOG_DEBUG, "Sending DISCONNECT") 2829 2830 remaining_length = 0 2831 2832 command = DISCONNECT 2833 packet = bytearray() 2834 packet.append(command) 2835 2836 if self._protocol == MQTTv5: 2837 if properties is not None or reasoncode is not None: 2838 if reasoncode is None: 2839 reasoncode = ReasonCodes(DISCONNECT >> 4, identifier=0) 2840 remaining_length += 1 2841 if properties is not None: 2842 packed_props = properties.pack() 2843 remaining_length += len(packed_props) 2844 2845 self._pack_remaining_length(packet, remaining_length) 2846 2847 if self._protocol == MQTTv5: 2848 if reasoncode != None: 2849 packet += reasoncode.pack() 2850 if properties != None: 2851 packet += packed_props 2852 2853 return self._packet_queue(command, packet, 0, 0) 2854 2855 def _send_subscribe(self, dup, topics, properties=None): 2856 remaining_length = 2 2857 if self._protocol == MQTTv5: 2858 if properties is None: 2859 packed_subscribe_properties = b'\x00' 2860 else: 2861 packed_subscribe_properties = properties.pack() 2862 remaining_length += len(packed_subscribe_properties) 2863 for t, _ in topics: 2864 remaining_length += 2 + len(t) + 1 2865 2866 command = SUBSCRIBE | (dup << 3) | 0x2 2867 packet = bytearray() 2868 packet.append(command) 2869 self._pack_remaining_length(packet, remaining_length) 2870 local_mid = self._mid_generate() 2871 packet.extend(struct.pack("!H", local_mid)) 2872 2873 if self._protocol == MQTTv5: 2874 packet += packed_subscribe_properties 2875 2876 for t, q in topics: 2877 self._pack_str16(packet, t) 2878 if self._protocol == MQTTv5: 2879 packet += q.pack() 2880 else: 2881 packet.append(q) 2882 2883 self._easy_log( 2884 MQTT_LOG_DEBUG, 2885 "Sending SUBSCRIBE (d%d, m%d) %s", 2886 dup, 2887 local_mid, 2888 topics, 2889 ) 2890 return (self._packet_queue(command, packet, local_mid, 1), local_mid) 2891 2892 def _send_unsubscribe(self, dup, topics, properties=None): 2893 remaining_length = 2 2894 if self._protocol == MQTTv5: 2895 if properties is None: 2896 packed_unsubscribe_properties = b'\x00' 2897 else: 2898 packed_unsubscribe_properties = properties.pack() 2899 remaining_length += len(packed_unsubscribe_properties) 2900 for t in topics: 2901 remaining_length += 2 + len(t) 2902 2903 command = UNSUBSCRIBE | (dup << 3) | 0x2 2904 packet = bytearray() 2905 packet.append(command) 2906 self._pack_remaining_length(packet, remaining_length) 2907 local_mid = self._mid_generate() 2908 packet.extend(struct.pack("!H", local_mid)) 2909 2910 if self._protocol == MQTTv5: 2911 packet += packed_unsubscribe_properties 2912 2913 for t in topics: 2914 self._pack_str16(packet, t) 2915 2916 # topics_repr = ", ".join("'"+topic.decode('utf8')+"'" for topic in topics) 2917 if self._protocol == MQTTv5: 2918 self._easy_log( 2919 MQTT_LOG_DEBUG, 2920 "Sending UNSUBSCRIBE (d%d, m%d) %s %s", 2921 dup, 2922 local_mid, 2923 properties, 2924 topics, 2925 ) 2926 else: 2927 self._easy_log( 2928 MQTT_LOG_DEBUG, 2929 "Sending UNSUBSCRIBE (d%d, m%d) %s", 2930 dup, 2931 local_mid, 2932 topics, 2933 ) 2934 return (self._packet_queue(command, packet, local_mid, 1), local_mid) 2935 2936 def _check_clean_session(self): 2937 if self._protocol == MQTTv5: 2938 if self._clean_start == MQTT_CLEAN_START_FIRST_ONLY: 2939 return self._mqttv5_first_connect 2940 else: 2941 return self._clean_start 2942 else: 2943 return self._clean_session 2944 2945 def _messages_reconnect_reset_out(self): 2946 with self._out_message_mutex: 2947 self._inflight_messages = 0 2948 for m in self._out_messages.values(): 2949 m.timestamp = 0 2950 if self._max_inflight_messages == 0 or self._inflight_messages < self._max_inflight_messages: 2951 if m.qos == 0: 2952 m.state = mqtt_ms_publish 2953 elif m.qos == 1: 2954 # self._inflight_messages = self._inflight_messages + 1 2955 if m.state == mqtt_ms_wait_for_puback: 2956 m.dup = True 2957 m.state = mqtt_ms_publish 2958 elif m.qos == 2: 2959 # self._inflight_messages = self._inflight_messages + 1 2960 if self._check_clean_session(): 2961 if m.state != mqtt_ms_publish: 2962 m.dup = True 2963 m.state = mqtt_ms_publish 2964 else: 2965 if m.state == mqtt_ms_wait_for_pubcomp: 2966 m.state = mqtt_ms_resend_pubrel 2967 else: 2968 if m.state == mqtt_ms_wait_for_pubrec: 2969 m.dup = True 2970 m.state = mqtt_ms_publish 2971 else: 2972 m.state = mqtt_ms_queued 2973 2974 def _messages_reconnect_reset_in(self): 2975 with self._in_message_mutex: 2976 if self._check_clean_session(): 2977 self._in_messages = collections.OrderedDict() 2978 return 2979 for m in self._in_messages.values(): 2980 m.timestamp = 0 2981 if m.qos != 2: 2982 self._in_messages.pop(m.mid) 2983 else: 2984 # Preserve current state 2985 pass 2986 2987 def _messages_reconnect_reset(self): 2988 self._messages_reconnect_reset_out() 2989 self._messages_reconnect_reset_in() 2990 2991 def _packet_queue(self, command, packet, mid, qos, info=None): 2992 mpkt = { 2993 'command': command, 2994 'mid': mid, 2995 'qos': qos, 2996 'pos': 0, 2997 'to_process': len(packet), 2998 'packet': packet, 2999 'info': info} 3000 3001 self._out_packet.append(mpkt) 3002 3003 # Write a single byte to sockpairW (connected to sockpairR) to break 3004 # out of select() if in threaded mode. 3005 if self._sockpairW is not None: 3006 try: 3007 self._sockpairW.send(sockpair_data) 3008 except BlockingIOError: 3009 pass 3010 3011 # If we have an external event loop registered, use that instead 3012 # of calling loop_write() directly. 3013 if self._thread is None and self._on_socket_register_write is None: 3014 if self._in_callback_mutex.acquire(False): 3015 self._in_callback_mutex.release() 3016 return self.loop_write() 3017 3018 self._call_socket_register_write() 3019 3020 return MQTT_ERR_SUCCESS 3021 3022 def _packet_handle(self): 3023 cmd = self._in_packet['command'] & 0xF0 3024 if cmd == PINGREQ: 3025 return self._handle_pingreq() 3026 elif cmd == PINGRESP: 3027 return self._handle_pingresp() 3028 elif cmd == PUBACK: 3029 return self._handle_pubackcomp("PUBACK") 3030 elif cmd == PUBCOMP: 3031 return self._handle_pubackcomp("PUBCOMP") 3032 elif cmd == PUBLISH: 3033 return self._handle_publish() 3034 elif cmd == PUBREC: 3035 return self._handle_pubrec() 3036 elif cmd == PUBREL: 3037 return self._handle_pubrel() 3038 elif cmd == CONNACK: 3039 return self._handle_connack() 3040 elif cmd == SUBACK: 3041 return self._handle_suback() 3042 elif cmd == UNSUBACK: 3043 return self._handle_unsuback() 3044 elif cmd == DISCONNECT and self._protocol == MQTTv5: # only allowed in MQTT 5.0 3045 return self._handle_disconnect() 3046 else: 3047 # If we don't recognise the command, return an error straight away. 3048 self._easy_log(MQTT_LOG_ERR, "Error: Unrecognised command %s", cmd) 3049 return MQTT_ERR_PROTOCOL 3050 3051 def _handle_pingreq(self): 3052 if self._in_packet['remaining_length'] != 0: 3053 return MQTT_ERR_PROTOCOL 3054 3055 self._easy_log(MQTT_LOG_DEBUG, "Received PINGREQ") 3056 return self._send_pingresp() 3057 3058 def _handle_pingresp(self): 3059 if self._in_packet['remaining_length'] != 0: 3060 return MQTT_ERR_PROTOCOL 3061 3062 # No longer waiting for a PINGRESP. 3063 self._ping_t = 0 3064 self._easy_log(MQTT_LOG_DEBUG, "Received PINGRESP") 3065 return MQTT_ERR_SUCCESS 3066 3067 def _handle_connack(self): 3068 if self._protocol == MQTTv5: 3069 if self._in_packet['remaining_length'] < 2: 3070 return MQTT_ERR_PROTOCOL 3071 elif self._in_packet['remaining_length'] != 2: 3072 return MQTT_ERR_PROTOCOL 3073 3074 if self._protocol == MQTTv5: 3075 (flags, result) = struct.unpack( 3076 "!BB", self._in_packet['packet'][:2]) 3077 if result == 1: 3078 # This is probably a failure from a broker that doesn't support 3079 # MQTT v5. 3080 reason = 132 # Unsupported protocol version 3081 properties = None 3082 else: 3083 reason = ReasonCodes(CONNACK >> 4, identifier=result) 3084 properties = Properties(CONNACK >> 4) 3085 properties.unpack(self._in_packet['packet'][2:]) 3086 else: 3087 (flags, result) = struct.unpack("!BB", self._in_packet['packet']) 3088 if self._protocol == MQTTv311: 3089 if result == CONNACK_REFUSED_PROTOCOL_VERSION: 3090 if not self._reconnect_on_failure: 3091 return MQTT_ERR_PROTOCOL 3092 self._easy_log( 3093 MQTT_LOG_DEBUG, 3094 "Received CONNACK (%s, %s), attempting downgrade to MQTT v3.1.", 3095 flags, result 3096 ) 3097 # Downgrade to MQTT v3.1 3098 self._protocol = MQTTv31 3099 return self.reconnect() 3100 elif (result == CONNACK_REFUSED_IDENTIFIER_REJECTED 3101 and self._client_id == b''): 3102 if not self._reconnect_on_failure: 3103 return MQTT_ERR_PROTOCOL 3104 self._easy_log( 3105 MQTT_LOG_DEBUG, 3106 "Received CONNACK (%s, %s), attempting to use non-empty CID", 3107 flags, result, 3108 ) 3109 self._client_id = base62(uuid.uuid4().int, padding=22) 3110 return self.reconnect() 3111 3112 if result == 0: 3113 self._state = mqtt_cs_connected 3114 self._reconnect_delay = None 3115 3116 if self._protocol == MQTTv5: 3117 self._easy_log( 3118 MQTT_LOG_DEBUG, "Received CONNACK (%s, %s) properties=%s", flags, reason, properties) 3119 else: 3120 self._easy_log( 3121 MQTT_LOG_DEBUG, "Received CONNACK (%s, %s)", flags, result) 3122 3123 # it won't be the first successful connect any more 3124 self._mqttv5_first_connect = False 3125 3126 with self._callback_mutex: 3127 on_connect = self.on_connect 3128 3129 if on_connect: 3130 flags_dict = {} 3131 flags_dict['session present'] = flags & 0x01 3132 with self._in_callback_mutex: 3133 try: 3134 if self._protocol == MQTTv5: 3135 on_connect(self, self._userdata, 3136 flags_dict, reason, properties) 3137 else: 3138 on_connect( 3139 self, self._userdata, flags_dict, result) 3140 except Exception as err: 3141 self._easy_log( 3142 MQTT_LOG_ERR, 'Caught exception in on_connect: %s', err) 3143 if not self.suppress_exceptions: 3144 raise 3145 3146 if result == 0: 3147 rc = 0 3148 with self._out_message_mutex: 3149 for m in self._out_messages.values(): 3150 m.timestamp = time_func() 3151 if m.state == mqtt_ms_queued: 3152 self.loop_write() # Process outgoing messages that have just been queued up 3153 return MQTT_ERR_SUCCESS 3154 3155 if m.qos == 0: 3156 with self._in_callback_mutex: # Don't call loop_write after _send_publish() 3157 rc = self._send_publish( 3158 m.mid, 3159 m.topic.encode('utf-8'), 3160 m.payload, 3161 m.qos, 3162 m.retain, 3163 m.dup, 3164 properties=m.properties 3165 ) 3166 if rc != 0: 3167 return rc 3168 elif m.qos == 1: 3169 if m.state == mqtt_ms_publish: 3170 self._inflight_messages += 1 3171 m.state = mqtt_ms_wait_for_puback 3172 with self._in_callback_mutex: # Don't call loop_write after _send_publish() 3173 rc = self._send_publish( 3174 m.mid, 3175 m.topic.encode('utf-8'), 3176 m.payload, 3177 m.qos, 3178 m.retain, 3179 m.dup, 3180 properties=m.properties 3181 ) 3182 if rc != 0: 3183 return rc 3184 elif m.qos == 2: 3185 if m.state == mqtt_ms_publish: 3186 self._inflight_messages += 1 3187 m.state = mqtt_ms_wait_for_pubrec 3188 with self._in_callback_mutex: # Don't call loop_write after _send_publish() 3189 rc = self._send_publish( 3190 m.mid, 3191 m.topic.encode('utf-8'), 3192 m.payload, 3193 m.qos, 3194 m.retain, 3195 m.dup, 3196 properties=m.properties 3197 ) 3198 if rc != 0: 3199 return rc 3200 elif m.state == mqtt_ms_resend_pubrel: 3201 self._inflight_messages += 1 3202 m.state = mqtt_ms_wait_for_pubcomp 3203 with self._in_callback_mutex: # Don't call loop_write after _send_publish() 3204 rc = self._send_pubrel(m.mid) 3205 if rc != 0: 3206 return rc 3207 self.loop_write() # Process outgoing messages that have just been queued up 3208 3209 return rc 3210 elif result > 0 and result < 6: 3211 return MQTT_ERR_CONN_REFUSED 3212 else: 3213 return MQTT_ERR_PROTOCOL 3214 3215 def _handle_disconnect(self): 3216 packet_type = DISCONNECT >> 4 3217 reasonCode = properties = None 3218 if self._in_packet['remaining_length'] > 2: 3219 reasonCode = ReasonCodes(packet_type) 3220 reasonCode.unpack(self._in_packet['packet']) 3221 if self._in_packet['remaining_length'] > 3: 3222 properties = Properties(packet_type) 3223 props, props_len = properties.unpack( 3224 self._in_packet['packet'][1:]) 3225 self._easy_log(MQTT_LOG_DEBUG, "Received DISCONNECT %s %s", 3226 reasonCode, 3227 properties 3228 ) 3229 3230 self._loop_rc_handle(reasonCode, properties) 3231 3232 return MQTT_ERR_SUCCESS 3233 3234 def _handle_suback(self): 3235 self._easy_log(MQTT_LOG_DEBUG, "Received SUBACK") 3236 pack_format = "!H" + str(len(self._in_packet['packet']) - 2) + 's' 3237 (mid, packet) = struct.unpack(pack_format, self._in_packet['packet']) 3238 3239 if self._protocol == MQTTv5: 3240 properties = Properties(SUBACK >> 4) 3241 props, props_len = properties.unpack(packet) 3242 reasoncodes = [] 3243 for c in packet[props_len:]: 3244 if sys.version_info[0] < 3: 3245 c = ord(c) 3246 reasoncodes.append(ReasonCodes(SUBACK >> 4, identifier=c)) 3247 else: 3248 pack_format = "!" + "B" * len(packet) 3249 granted_qos = struct.unpack(pack_format, packet) 3250 3251 with self._callback_mutex: 3252 on_subscribe = self.on_subscribe 3253 3254 if on_subscribe: 3255 with self._in_callback_mutex: # Don't call loop_write after _send_publish() 3256 try: 3257 if self._protocol == MQTTv5: 3258 on_subscribe( 3259 self, self._userdata, mid, reasoncodes, properties) 3260 else: 3261 on_subscribe( 3262 self, self._userdata, mid, granted_qos) 3263 except Exception as err: 3264 self._easy_log( 3265 MQTT_LOG_ERR, 'Caught exception in on_subscribe: %s', err) 3266 if not self.suppress_exceptions: 3267 raise 3268 3269 return MQTT_ERR_SUCCESS 3270 3271 def _handle_publish(self): 3272 rc = 0 3273 3274 header = self._in_packet['command'] 3275 message = MQTTMessage() 3276 message.dup = (header & 0x08) >> 3 3277 message.qos = (header & 0x06) >> 1 3278 message.retain = (header & 0x01) 3279 3280 pack_format = "!H" + str(len(self._in_packet['packet']) - 2) + 's' 3281 (slen, packet) = struct.unpack(pack_format, self._in_packet['packet']) 3282 pack_format = '!' + str(slen) + 's' + str(len(packet) - slen) + 's' 3283 (topic, packet) = struct.unpack(pack_format, packet) 3284 3285 if self._protocol != MQTTv5 and len(topic) == 0: 3286 return MQTT_ERR_PROTOCOL 3287 3288 # Handle topics with invalid UTF-8 3289 # This replaces an invalid topic with a message and the hex 3290 # representation of the topic for logging. When the user attempts to 3291 # access message.topic in the callback, an exception will be raised. 3292 try: 3293 print_topic = topic.decode('utf-8') 3294 except UnicodeDecodeError: 3295 print_topic = "TOPIC WITH INVALID UTF-8: " + str(topic) 3296 3297 message.topic = topic 3298 3299 if message.qos > 0: 3300 pack_format = "!H" + str(len(packet) - 2) + 's' 3301 (message.mid, packet) = struct.unpack(pack_format, packet) 3302 3303 if self._protocol == MQTTv5: 3304 message.properties = Properties(PUBLISH >> 4) 3305 props, props_len = message.properties.unpack(packet) 3306 packet = packet[props_len:] 3307 3308 message.payload = packet 3309 3310 if self._protocol == MQTTv5: 3311 self._easy_log( 3312 MQTT_LOG_DEBUG, 3313 "Received PUBLISH (d%d, q%d, r%d, m%d), '%s', properties=%s, ... (%d bytes)", 3314 message.dup, message.qos, message.retain, message.mid, 3315 print_topic, message.properties, len(message.payload) 3316 ) 3317 else: 3318 self._easy_log( 3319 MQTT_LOG_DEBUG, 3320 "Received PUBLISH (d%d, q%d, r%d, m%d), '%s', ... (%d bytes)", 3321 message.dup, message.qos, message.retain, message.mid, 3322 print_topic, len(message.payload) 3323 ) 3324 3325 message.timestamp = time_func() 3326 if message.qos == 0: 3327 self._handle_on_message(message) 3328 return MQTT_ERR_SUCCESS 3329 elif message.qos == 1: 3330 self._handle_on_message(message) 3331 return self._send_puback(message.mid) 3332 elif message.qos == 2: 3333 rc = self._send_pubrec(message.mid) 3334 message.state = mqtt_ms_wait_for_pubrel 3335 with self._in_message_mutex: 3336 self._in_messages[message.mid] = message 3337 return rc 3338 else: 3339 return MQTT_ERR_PROTOCOL 3340 3341 def _handle_pubrel(self): 3342 if self._protocol == MQTTv5: 3343 if self._in_packet['remaining_length'] < 2: 3344 return MQTT_ERR_PROTOCOL 3345 elif self._in_packet['remaining_length'] != 2: 3346 return MQTT_ERR_PROTOCOL 3347 3348 mid, = struct.unpack("!H", self._in_packet['packet']) 3349 self._easy_log(MQTT_LOG_DEBUG, "Received PUBREL (Mid: %d)", mid) 3350 3351 with self._in_message_mutex: 3352 if mid in self._in_messages: 3353 # Only pass the message on if we have removed it from the queue - this 3354 # prevents multiple callbacks for the same message. 3355 message = self._in_messages.pop(mid) 3356 self._handle_on_message(message) 3357 self._inflight_messages -= 1 3358 if self._max_inflight_messages > 0: 3359 with self._out_message_mutex: 3360 rc = self._update_inflight() 3361 if rc != MQTT_ERR_SUCCESS: 3362 return rc 3363 3364 # FIXME: this should only be done if the message is known 3365 # If unknown it's a protocol error and we should close the connection. 3366 # But since we don't have (on disk) persistence for the session, it 3367 # is possible that we must known about this message. 3368 # Choose to acknwoledge this messsage (and thus losing a message) but 3369 # avoid hanging. See #284. 3370 return self._send_pubcomp(mid) 3371 3372 def _update_inflight(self): 3373 # Dont lock message_mutex here 3374 for m in self._out_messages.values(): 3375 if self._inflight_messages < self._max_inflight_messages: 3376 if m.qos > 0 and m.state == mqtt_ms_queued: 3377 self._inflight_messages += 1 3378 if m.qos == 1: 3379 m.state = mqtt_ms_wait_for_puback 3380 elif m.qos == 2: 3381 m.state = mqtt_ms_wait_for_pubrec 3382 rc = self._send_publish( 3383 m.mid, 3384 m.topic.encode('utf-8'), 3385 m.payload, 3386 m.qos, 3387 m.retain, 3388 m.dup, 3389 properties=m.properties, 3390 ) 3391 if rc != 0: 3392 return rc 3393 else: 3394 return MQTT_ERR_SUCCESS 3395 return MQTT_ERR_SUCCESS 3396 3397 def _handle_pubrec(self): 3398 if self._protocol == MQTTv5: 3399 if self._in_packet['remaining_length'] < 2: 3400 return MQTT_ERR_PROTOCOL 3401 elif self._in_packet['remaining_length'] != 2: 3402 return MQTT_ERR_PROTOCOL 3403 3404 mid, = struct.unpack("!H", self._in_packet['packet'][:2]) 3405 if self._protocol == MQTTv5: 3406 if self._in_packet['remaining_length'] > 2: 3407 reasonCode = ReasonCodes(PUBREC >> 4) 3408 reasonCode.unpack(self._in_packet['packet'][2:]) 3409 if self._in_packet['remaining_length'] > 3: 3410 properties = Properties(PUBREC >> 4) 3411 props, props_len = properties.unpack( 3412 self._in_packet['packet'][3:]) 3413 self._easy_log(MQTT_LOG_DEBUG, "Received PUBREC (Mid: %d)", mid) 3414 3415 with self._out_message_mutex: 3416 if mid in self._out_messages: 3417 msg = self._out_messages[mid] 3418 msg.state = mqtt_ms_wait_for_pubcomp 3419 msg.timestamp = time_func() 3420 return self._send_pubrel(mid) 3421 3422 return MQTT_ERR_SUCCESS 3423 3424 def _handle_unsuback(self): 3425 if self._protocol == MQTTv5: 3426 if self._in_packet['remaining_length'] < 4: 3427 return MQTT_ERR_PROTOCOL 3428 elif self._in_packet['remaining_length'] != 2: 3429 return MQTT_ERR_PROTOCOL 3430 3431 mid, = struct.unpack("!H", self._in_packet['packet'][:2]) 3432 if self._protocol == MQTTv5: 3433 packet = self._in_packet['packet'][2:] 3434 properties = Properties(UNSUBACK >> 4) 3435 props, props_len = properties.unpack(packet) 3436 reasoncodes = [] 3437 for c in packet[props_len:]: 3438 if sys.version_info[0] < 3: 3439 c = ord(c) 3440 reasoncodes.append(ReasonCodes(UNSUBACK >> 4, identifier=c)) 3441 if len(reasoncodes) == 1: 3442 reasoncodes = reasoncodes[0] 3443 3444 self._easy_log(MQTT_LOG_DEBUG, "Received UNSUBACK (Mid: %d)", mid) 3445 with self._callback_mutex: 3446 on_unsubscribe = self.on_unsubscribe 3447 3448 if on_unsubscribe: 3449 with self._in_callback_mutex: 3450 try: 3451 if self._protocol == MQTTv5: 3452 on_unsubscribe( 3453 self, self._userdata, mid, properties, reasoncodes) 3454 else: 3455 on_unsubscribe(self, self._userdata, mid) 3456 except Exception as err: 3457 self._easy_log( 3458 MQTT_LOG_ERR, 'Caught exception in on_unsubscribe: %s', err) 3459 if not self.suppress_exceptions: 3460 raise 3461 3462 return MQTT_ERR_SUCCESS 3463 3464 def _do_on_disconnect(self, rc, properties=None): 3465 with self._callback_mutex: 3466 on_disconnect = self.on_disconnect 3467 3468 if on_disconnect: 3469 with self._in_callback_mutex: 3470 try: 3471 if self._protocol == MQTTv5: 3472 on_disconnect( 3473 self, self._userdata, rc, properties) 3474 else: 3475 on_disconnect(self, self._userdata, rc) 3476 except Exception as err: 3477 self._easy_log( 3478 MQTT_LOG_ERR, 'Caught exception in on_disconnect: %s', err) 3479 if not self.suppress_exceptions: 3480 raise 3481 3482 def _do_on_publish(self, mid): 3483 with self._callback_mutex: 3484 on_publish = self.on_publish 3485 3486 if on_publish: 3487 with self._in_callback_mutex: 3488 try: 3489 on_publish(self, self._userdata, mid) 3490 except Exception as err: 3491 self._easy_log( 3492 MQTT_LOG_ERR, 'Caught exception in on_publish: %s', err) 3493 if not self.suppress_exceptions: 3494 raise 3495 3496 msg = self._out_messages.pop(mid) 3497 msg.info._set_as_published() 3498 if msg.qos > 0: 3499 self._inflight_messages -= 1 3500 if self._max_inflight_messages > 0: 3501 rc = self._update_inflight() 3502 if rc != MQTT_ERR_SUCCESS: 3503 return rc 3504 return MQTT_ERR_SUCCESS 3505 3506 def _handle_pubackcomp(self, cmd): 3507 if self._protocol == MQTTv5: 3508 if self._in_packet['remaining_length'] < 2: 3509 return MQTT_ERR_PROTOCOL 3510 elif self._in_packet['remaining_length'] != 2: 3511 return MQTT_ERR_PROTOCOL 3512 3513 packet_type = PUBACK if cmd == "PUBACK" else PUBCOMP 3514 packet_type = packet_type >> 4 3515 mid, = struct.unpack("!H", self._in_packet['packet'][:2]) 3516 if self._protocol == MQTTv5: 3517 if self._in_packet['remaining_length'] > 2: 3518 reasonCode = ReasonCodes(packet_type) 3519 reasonCode.unpack(self._in_packet['packet'][2:]) 3520 if self._in_packet['remaining_length'] > 3: 3521 properties = Properties(packet_type) 3522 props, props_len = properties.unpack( 3523 self._in_packet['packet'][3:]) 3524 self._easy_log(MQTT_LOG_DEBUG, "Received %s (Mid: %d)", cmd, mid) 3525 3526 with self._out_message_mutex: 3527 if mid in self._out_messages: 3528 # Only inform the client the message has been sent once. 3529 rc = self._do_on_publish(mid) 3530 return rc 3531 3532 return MQTT_ERR_SUCCESS 3533 3534 def _handle_on_message(self, message): 3535 matched = False 3536 3537 try: 3538 topic = message.topic 3539 except UnicodeDecodeError: 3540 topic = None 3541 3542 on_message_callbacks = [] 3543 with self._callback_mutex: 3544 if topic is not None: 3545 for callback in self._on_message_filtered.iter_match(message.topic): 3546 on_message_callbacks.append(callback) 3547 3548 if len(on_message_callbacks) == 0: 3549 on_message = self.on_message 3550 else: 3551 on_message = None 3552 3553 for callback in on_message_callbacks: 3554 with self._in_callback_mutex: 3555 try: 3556 callback(self, self._userdata, message) 3557 except Exception as err: 3558 self._easy_log( 3559 MQTT_LOG_ERR, 3560 'Caught exception in user defined callback function %s: %s', 3561 callback.__name__, 3562 err 3563 ) 3564 if not self.suppress_exceptions: 3565 raise 3566 3567 if on_message: 3568 with self._in_callback_mutex: 3569 try: 3570 on_message(self, self._userdata, message) 3571 except Exception as err: 3572 self._easy_log( 3573 MQTT_LOG_ERR, 'Caught exception in on_message: %s', err) 3574 if not self.suppress_exceptions: 3575 raise 3576 3577 3578 def _handle_on_connect_fail(self): 3579 with self._callback_mutex: 3580 on_connect_fail = self.on_connect_fail 3581 3582 if on_connect_fail: 3583 with self._in_callback_mutex: 3584 try: 3585 on_connect_fail(self, self._userdata) 3586 except Exception as err: 3587 self._easy_log( 3588 MQTT_LOG_ERR, 'Caught exception in on_connect_fail: %s', err) 3589 3590 def _thread_main(self): 3591 self.loop_forever(retry_first_connection=True) 3592 3593 def _reconnect_wait(self): 3594 # See reconnect_delay_set for details 3595 now = time_func() 3596 with self._reconnect_delay_mutex: 3597 if self._reconnect_delay is None: 3598 self._reconnect_delay = self._reconnect_min_delay 3599 else: 3600 self._reconnect_delay = min( 3601 self._reconnect_delay * 2, 3602 self._reconnect_max_delay, 3603 ) 3604 3605 target_time = now + self._reconnect_delay 3606 3607 remaining = target_time - now 3608 while (self._state != mqtt_cs_disconnecting 3609 and not self._thread_terminate 3610 and remaining > 0): 3611 3612 time.sleep(min(remaining, 1)) 3613 remaining = target_time - time_func() 3614 3615 @staticmethod 3616 def _proxy_is_valid(p): 3617 def check(t, a): 3618 return (socks is not None and 3619 t in set([socks.HTTP, socks.SOCKS4, socks.SOCKS5]) and a) 3620 3621 if isinstance(p, dict): 3622 return check(p.get("proxy_type"), p.get("proxy_addr")) 3623 elif isinstance(p, (list, tuple)): 3624 return len(p) == 6 and check(p[0], p[1]) 3625 else: 3626 return False 3627 3628 def _get_proxy(self): 3629 if socks is None: 3630 return None 3631 3632 # First, check if the user explicitly passed us a proxy to use 3633 if self._proxy_is_valid(self._proxy): 3634 return self._proxy 3635 3636 # Next, check for an mqtt_proxy environment variable as long as the host 3637 # we're trying to connect to isn't listed under the no_proxy environment 3638 # variable (matches built-in module urllib's behavior) 3639 if not (hasattr(urllib_dot_request, "proxy_bypass") and 3640 urllib_dot_request.proxy_bypass(self._host)): 3641 env_proxies = urllib_dot_request.getproxies() 3642 if "mqtt" in env_proxies: 3643 parts = urllib_dot_parse.urlparse(env_proxies["mqtt"]) 3644 if parts.scheme == "http": 3645 proxy = { 3646 "proxy_type": socks.HTTP, 3647 "proxy_addr": parts.hostname, 3648 "proxy_port": parts.port 3649 } 3650 return proxy 3651 elif parts.scheme == "socks": 3652 proxy = { 3653 "proxy_type": socks.SOCKS5, 3654 "proxy_addr": parts.hostname, 3655 "proxy_port": parts.port 3656 } 3657 return proxy 3658 3659 # Finally, check if the user has monkeypatched the PySocks library with 3660 # a default proxy 3661 socks_default = socks.get_default_proxy() 3662 if self._proxy_is_valid(socks_default): 3663 proxy_keys = ("proxy_type", "proxy_addr", "proxy_port", 3664 "proxy_rdns", "proxy_username", "proxy_password") 3665 return dict(zip(proxy_keys, socks_default)) 3666 3667 # If we didn't find a proxy through any of the above methods, return 3668 # None to indicate that the connection should be handled normally 3669 return None 3670 3671 def _create_socket_connection(self): 3672 proxy = self._get_proxy() 3673 addr = (self._host, self._port) 3674 source = (self._bind_address, self._bind_port) 3675 3676 3677 if sys.version_info < (2, 7) or (3, 0) < sys.version_info < (3, 2): 3678 # Have to short-circuit here because of unsupported source_address 3679 # param in earlier Python versions. 3680 return socket.create_connection(addr, timeout=self._connect_timeout) 3681 3682 if proxy: 3683 return socks.create_connection(addr, timeout=self._connect_timeout, source_address=source, **proxy) 3684 else: 3685 return socket.create_connection(addr, timeout=self._connect_timeout, source_address=source) 3686 3687 3688class WebsocketWrapper(object): 3689 OPCODE_CONTINUATION = 0x0 3690 OPCODE_TEXT = 0x1 3691 OPCODE_BINARY = 0x2 3692 OPCODE_CONNCLOSE = 0x8 3693 OPCODE_PING = 0x9 3694 OPCODE_PONG = 0xa 3695 3696 def __init__(self, socket, host, port, is_ssl, path, extra_headers): 3697 3698 self.connected = False 3699 3700 self._ssl = is_ssl 3701 self._host = host 3702 self._port = port 3703 self._socket = socket 3704 self._path = path 3705 3706 self._sendbuffer = bytearray() 3707 self._readbuffer = bytearray() 3708 3709 self._requested_size = 0 3710 self._payload_head = 0 3711 self._readbuffer_head = 0 3712 3713 self._do_handshake(extra_headers) 3714 3715 def __del__(self): 3716 3717 self._sendbuffer = None 3718 self._readbuffer = None 3719 3720 def _do_handshake(self, extra_headers): 3721 3722 sec_websocket_key = uuid.uuid4().bytes 3723 sec_websocket_key = base64.b64encode(sec_websocket_key) 3724 3725 websocket_headers = { 3726 "Host": "{self._host:s}:{self._port:d}".format(self=self), 3727 "Upgrade": "websocket", 3728 "Connection": "Upgrade", 3729 "Origin": "https://{self._host:s}:{self._port:d}".format(self=self), 3730 "Sec-WebSocket-Key": sec_websocket_key.decode("utf8"), 3731 "Sec-Websocket-Version": "13", 3732 "Sec-Websocket-Protocol": "mqtt", 3733 } 3734 3735 # This is checked in ws_set_options so it will either be None, a 3736 # dictionary, or a callable 3737 if isinstance(extra_headers, dict): 3738 websocket_headers.update(extra_headers) 3739 elif callable(extra_headers): 3740 websocket_headers = extra_headers(websocket_headers) 3741 3742 header = "\r\n".join([ 3743 "GET {self._path} HTTP/1.1".format(self=self), 3744 "\r\n".join("{}: {}".format(i, j) 3745 for i, j in websocket_headers.items()), 3746 "\r\n", 3747 ]).encode("utf8") 3748 3749 self._socket.send(header) 3750 3751 has_secret = False 3752 has_upgrade = False 3753 3754 while True: 3755 # read HTTP response header as lines 3756 byte = self._socket.recv(1) 3757 3758 self._readbuffer.extend(byte) 3759 3760 # line end 3761 if byte == b"\n": 3762 if len(self._readbuffer) > 2: 3763 # check upgrade 3764 if b"connection" in str(self._readbuffer).lower().encode('utf-8'): 3765 if b"upgrade" not in str(self._readbuffer).lower().encode('utf-8'): 3766 raise WebsocketConnectionError( 3767 "WebSocket handshake error, connection not upgraded") 3768 else: 3769 has_upgrade = True 3770 3771 # check key hash 3772 if b"sec-websocket-accept" in str(self._readbuffer).lower().encode('utf-8'): 3773 GUID = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11" 3774 3775 server_hash = self._readbuffer.decode( 3776 'utf-8').split(": ", 1)[1] 3777 server_hash = server_hash.strip().encode('utf-8') 3778 3779 client_hash = sec_websocket_key.decode('utf-8') + GUID 3780 client_hash = hashlib.sha1(client_hash.encode('utf-8')) 3781 client_hash = base64.b64encode(client_hash.digest()) 3782 3783 if server_hash != client_hash: 3784 raise WebsocketConnectionError( 3785 "WebSocket handshake error, invalid secret key") 3786 else: 3787 has_secret = True 3788 else: 3789 # ending linebreak 3790 break 3791 3792 # reset linebuffer 3793 self._readbuffer = bytearray() 3794 3795 # connection reset 3796 elif not byte: 3797 raise WebsocketConnectionError("WebSocket handshake error") 3798 3799 if not has_upgrade or not has_secret: 3800 raise WebsocketConnectionError("WebSocket handshake error") 3801 3802 self._readbuffer = bytearray() 3803 self.connected = True 3804 3805 def _create_frame(self, opcode, data, do_masking=1): 3806 3807 header = bytearray() 3808 length = len(data) 3809 3810 mask_key = bytearray(os.urandom(4)) 3811 mask_flag = do_masking 3812 3813 # 1 << 7 is the final flag, we don't send continuated data 3814 header.append(1 << 7 | opcode) 3815 3816 if length < 126: 3817 header.append(mask_flag << 7 | length) 3818 3819 elif length < 65536: 3820 header.append(mask_flag << 7 | 126) 3821 header += struct.pack("!H", length) 3822 3823 elif length < 0x8000000000000001: 3824 header.append(mask_flag << 7 | 127) 3825 header += struct.pack("!Q", length) 3826 3827 else: 3828 raise ValueError("Maximum payload size is 2^63") 3829 3830 if mask_flag == 1: 3831 for index in range(length): 3832 data[index] ^= mask_key[index % 4] 3833 data = mask_key + data 3834 3835 return header + data 3836 3837 def _buffered_read(self, length): 3838 3839 # try to recv and store needed bytes 3840 wanted_bytes = length - (len(self._readbuffer) - self._readbuffer_head) 3841 if wanted_bytes > 0: 3842 3843 data = self._socket.recv(wanted_bytes) 3844 3845 if not data: 3846 raise ConnectionAbortedError 3847 else: 3848 self._readbuffer.extend(data) 3849 3850 if len(data) < wanted_bytes: 3851 raise BlockingIOError 3852 3853 self._readbuffer_head += length 3854 return self._readbuffer[self._readbuffer_head - length:self._readbuffer_head] 3855 3856 def _recv_impl(self, length): 3857 3858 # try to decode websocket payload part from data 3859 try: 3860 3861 self._readbuffer_head = 0 3862 3863 result = None 3864 3865 chunk_startindex = self._payload_head 3866 chunk_endindex = self._payload_head + length 3867 3868 header1 = self._buffered_read(1) 3869 header2 = self._buffered_read(1) 3870 3871 opcode = (header1[0] & 0x0f) 3872 maskbit = (header2[0] & 0x80) == 0x80 3873 lengthbits = (header2[0] & 0x7f) 3874 payload_length = lengthbits 3875 mask_key = None 3876 3877 # read length 3878 if lengthbits == 0x7e: 3879 3880 value = self._buffered_read(2) 3881 payload_length, = struct.unpack("!H", value) 3882 3883 elif lengthbits == 0x7f: 3884 3885 value = self._buffered_read(8) 3886 payload_length, = struct.unpack("!Q", value) 3887 3888 # read mask 3889 if maskbit: 3890 mask_key = self._buffered_read(4) 3891 3892 # if frame payload is shorter than the requested data, read only the possible part 3893 readindex = chunk_endindex 3894 if payload_length < readindex: 3895 readindex = payload_length 3896 3897 if readindex > 0: 3898 # get payload chunk 3899 payload = self._buffered_read(readindex) 3900 3901 # unmask only the needed part 3902 if maskbit: 3903 for index in range(chunk_startindex, readindex): 3904 payload[index] ^= mask_key[index % 4] 3905 3906 result = payload[chunk_startindex:readindex] 3907 self._payload_head = readindex 3908 else: 3909 payload = bytearray() 3910 3911 # check if full frame arrived and reset readbuffer and payloadhead if needed 3912 if readindex == payload_length: 3913 self._readbuffer = bytearray() 3914 self._payload_head = 0 3915 3916 # respond to non-binary opcodes, their arrival is not guaranteed beacause of non-blocking sockets 3917 if opcode == WebsocketWrapper.OPCODE_CONNCLOSE: 3918 frame = self._create_frame( 3919 WebsocketWrapper.OPCODE_CONNCLOSE, payload, 0) 3920 self._socket.send(frame) 3921 3922 if opcode == WebsocketWrapper.OPCODE_PING: 3923 frame = self._create_frame( 3924 WebsocketWrapper.OPCODE_PONG, payload, 0) 3925 self._socket.send(frame) 3926 3927 # This isn't *proper* handling of continuation frames, but given 3928 # that we only support binary frames, it is *probably* good enough. 3929 if (opcode == WebsocketWrapper.OPCODE_BINARY or opcode == WebsocketWrapper.OPCODE_CONTINUATION) \ 3930 and payload_length > 0: 3931 return result 3932 else: 3933 raise BlockingIOError 3934 3935 except ConnectionError: 3936 self.connected = False 3937 return b'' 3938 3939 def _send_impl(self, data): 3940 3941 # if previous frame was sent successfully 3942 if len(self._sendbuffer) == 0: 3943 # create websocket frame 3944 frame = self._create_frame( 3945 WebsocketWrapper.OPCODE_BINARY, bytearray(data)) 3946 self._sendbuffer.extend(frame) 3947 self._requested_size = len(data) 3948 3949 # try to write out as much as possible 3950 length = self._socket.send(self._sendbuffer) 3951 3952 self._sendbuffer = self._sendbuffer[length:] 3953 3954 if len(self._sendbuffer) == 0: 3955 # buffer sent out completely, return with payload's size 3956 return self._requested_size 3957 else: 3958 # couldn't send whole data, request the same data again with 0 as sent length 3959 return 0 3960 3961 def recv(self, length): 3962 return self._recv_impl(length) 3963 3964 def read(self, length): 3965 return self._recv_impl(length) 3966 3967 def send(self, data): 3968 return self._send_impl(data) 3969 3970 def write(self, data): 3971 return self._send_impl(data) 3972 3973 def close(self): 3974 self._socket.close() 3975 3976 def fileno(self): 3977 return self._socket.fileno() 3978 3979 def pending(self): 3980 # Fix for bug #131: a SSL socket may still have data available 3981 # for reading without select() being aware of it. 3982 if self._ssl: 3983 return self._socket.pending() 3984 else: 3985 # normal socket rely only on select() 3986 return 0 3987 3988 def setblocking(self, flag): 3989 self._socket.setblocking(flag) 3990