1# -*- coding: utf-8 -*- 2# 3# Copyright (C) 2021 Chris Caron <lead2gold@gmail.com> 4# All rights reserved. 5# 6# This code is licensed under the MIT License. 7# 8# Permission is hereby granted, free of charge, to any person obtaining a copy 9# of this software and associated documentation files(the "Software"), to deal 10# in the Software without restriction, including without limitation the rights 11# to use, copy, modify, merge, publish, distribute, sublicense, and / or sell 12# copies of the Software, and to permit persons to whom the Software is 13# furnished to do so, subject to the following conditions : 14# 15# The above copyright notice and this permission notice shall be included in 16# all copies or substantial portions of the Software. 17# 18# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR 19# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, 20# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.IN NO EVENT SHALL THE 21# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER 22# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, 23# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN 24# THE SOFTWARE. 25 26# PAHO MQTT Documentation: 27# https://www.eclipse.org/paho/index.php?page=clients/python/docs/index.php 28# 29# Looking at the PAHO MQTT Source can help shed light on what's going on too 30# as their inline documentation is pretty good! 31# https://github.com/eclipse/paho.mqtt.python\ 32# /blob/master/src/paho/mqtt/client.py 33import ssl 34import re 35import six 36from time import sleep 37from datetime import datetime 38from os.path import isfile 39from .NotifyBase import NotifyBase 40from ..URLBase import PrivacyMode 41from ..common import NotifyType 42from ..utils import parse_list 43from ..utils import parse_bool 44from ..AppriseLocale import gettext_lazy as _ 45 46# Default our global support flag 47NOTIFY_MQTT_SUPPORT_ENABLED = False 48 49if six.PY2: 50 # handle Python v2.7 suport 51 class ConnectionError(Exception): 52 pass 53 54try: 55 # 3rd party modules 56 import paho.mqtt.client as mqtt 57 58 # We're good to go! 59 NOTIFY_MQTT_SUPPORT_ENABLED = True 60 61 MQTT_PROTOCOL_MAP = { 62 # v3.1.1 63 "311": mqtt.MQTTv311, 64 # v3.1 65 "31": mqtt.MQTTv31, 66 # v5.0 67 "5": mqtt.MQTTv5, 68 # v5.0 (alias) 69 "50": mqtt.MQTTv5, 70 } 71 72except ImportError: 73 # No problem; we just simply can't support this plugin because we're 74 # either using Linux, or simply do not have pywin32 installed. 75 MQTT_PROTOCOL_MAP = {} 76 77# A lookup map for relaying version to user 78HUMAN_MQTT_PROTOCOL_MAP = { 79 "v3.1.1": "311", 80 "v3.1": "31", 81 "v5.0": "5", 82} 83 84 85class NotifyMQTT(NotifyBase): 86 """ 87 A wrapper for MQTT Notifications 88 """ 89 90 # Set our global enabled flag 91 enabled = NOTIFY_MQTT_SUPPORT_ENABLED 92 93 requirements = { 94 # Define our required packaging in order to work 95 'packages_required': 'paho-mqtt' 96 } 97 98 # The default descriptive name associated with the Notification 99 service_name = 'MQTT Notification' 100 101 # The default protocol 102 protocol = 'mqtt' 103 104 # Secure protocol 105 secure_protocol = 'mqtts' 106 107 # A URL that takes you to the setup/help of the specific protocol 108 setup_url = 'https://github.com/caronc/apprise/wiki/Notify_mqtt' 109 110 # MQTT does not have a title 111 title_maxlen = 0 112 113 # The maximum length a body can be set to 114 body_maxlen = 268435455 115 116 # Use a throttle; but it doesn't need to be so strict since most 117 # MQTT server hostings can handle the small bursts of packets and are 118 # locally hosted anyway 119 request_rate_per_sec = 0.5 120 121 # Port Defaults (unless otherwise specified) 122 mqtt_insecure_port = 1883 123 124 # The default secure port to use (if mqtts://) 125 mqtt_secure_port = 8883 126 127 # The default mqtt keepalive value 128 mqtt_keepalive = 30 129 130 # The default mqtt transport 131 mqtt_transport = "tcp" 132 133 # The number of seconds to wait for a publish to occur at before 134 # checking to see if it's been sent yet. 135 mqtt_block_time_sec = 0.2 136 137 # Set the maximum number of messages with QoS>0 that can be part way 138 # through their network flow at once. 139 mqtt_inflight_messages = 200 140 141 # Taken from https://golang.org/src/crypto/x509/root_linux.go 142 CA_CERTIFICATE_FILE_LOCATIONS = [ 143 # Debian/Ubuntu/Gentoo etc. 144 "/etc/ssl/certs/ca-certificates.crt", 145 # Fedora/RHEL 6 146 "/etc/pki/tls/certs/ca-bundle.crt", 147 # OpenSUSE 148 "/etc/ssl/ca-bundle.pem", 149 # OpenELEC 150 "/etc/pki/tls/cacert.pem", 151 # CentOS/RHEL 7 152 "/etc/pki/ca-trust/extracted/pem/tls-ca-bundle.pem", 153 ] 154 155 # Define object templates 156 templates = ( 157 '{schema}://{user}@{host}/{topic}', 158 '{schema}://{user}@{host}:{port}/{topic}', 159 '{schema}://{user}:{password}@{host}/{topic}', 160 '{schema}://{user}:{password}@{host}:{port}/{topic}', 161 ) 162 163 template_tokens = dict(NotifyBase.template_tokens, **{ 164 'host': { 165 'name': _('Hostname'), 166 'type': 'string', 167 'required': True, 168 }, 169 'port': { 170 'name': _('Port'), 171 'type': 'int', 172 'min': 1, 173 'max': 65535, 174 }, 175 'user': { 176 'name': _('User Name'), 177 'type': 'string', 178 'required': True, 179 }, 180 'password': { 181 'name': _('Password'), 182 'type': 'string', 183 'private': True, 184 'required': True, 185 }, 186 'topic': { 187 'name': _('Target Queue'), 188 'type': 'string', 189 'map_to': 'targets', 190 }, 191 'targets': { 192 'name': _('Targets'), 193 'type': 'list:string', 194 }, 195 }) 196 197 # Define our template arguments 198 template_args = dict(NotifyBase.template_args, **{ 199 'to': { 200 'alias_of': 'targets', 201 }, 202 'qos': { 203 'name': _('QOS'), 204 'type': 'int', 205 'default': 0, 206 'min': 0, 207 'max': 2, 208 }, 209 'version': { 210 'name': _('Version'), 211 'type': 'choice:string', 212 'values': HUMAN_MQTT_PROTOCOL_MAP, 213 'default': "v3.1.1", 214 }, 215 'client_id': { 216 'name': _('Client ID'), 217 'type': 'string', 218 }, 219 'session': { 220 'name': _('Use Session'), 221 'type': 'bool', 222 'default': False, 223 }, 224 }) 225 226 def __init__(self, targets=None, version=None, qos=None, 227 client_id=None, session=None, **kwargs): 228 """ 229 Initialize MQTT Object 230 """ 231 232 super(NotifyMQTT, self).__init__(**kwargs) 233 234 # Initialize topics 235 self.topics = parse_list(targets) 236 237 if version is None: 238 self.version = self.template_args['version']['default'] 239 else: 240 self.version = version 241 242 # Save our client id if specified 243 self.client_id = client_id 244 245 # Maintain our session (associated with our user id if set) 246 self.session = self.template_args['session']['default'] \ 247 if session is None or not self.client_id \ 248 else parse_bool(session) 249 250 # Set up our Quality of Service (QoS) 251 try: 252 self.qos = self.template_args['qos']['default'] \ 253 if qos is None else int(qos) 254 255 if self.qos < self.template_args['qos']['min'] \ 256 or self.qos > self.template_args['qos']['max']: 257 # Let error get handle on exceptio higher up 258 raise ValueError("") 259 260 except (ValueError, TypeError): 261 msg = 'An invalid MQTT QOS ({}) was specified.'.format(qos) 262 self.logger.warning(msg) 263 raise TypeError(msg) 264 265 if not self.port: 266 # Assign port (if not otherwise set) 267 self.port = self.mqtt_secure_port \ 268 if self.secure else self.mqtt_insecure_port 269 270 self.ca_certs = None 271 if self.secure: 272 # verify SSL key or abort 273 self.ca_certs = next( 274 (cert for cert in self.CA_CERTIFICATE_FILE_LOCATIONS 275 if isfile(cert)), None) 276 277 # Set up our MQTT Publisher 278 try: 279 # Get our protocol 280 self.mqtt_protocol = \ 281 MQTT_PROTOCOL_MAP[re.sub(r'[^0-9]+', '', self.version)] 282 283 except (KeyError): 284 msg = 'An invalid MQTT Protocol version ' \ 285 '({}) was specified.'.format(version) 286 self.logger.warning(msg) 287 raise TypeError(msg) 288 289 # Our MQTT Client Object 290 self.client = mqtt.Client( 291 client_id=self.client_id, 292 clean_session=not self.session, userdata=None, 293 protocol=self.mqtt_protocol, transport=self.mqtt_transport, 294 ) 295 296 # Our maximum number of in-flight messages 297 self.client.max_inflight_messages_set(self.mqtt_inflight_messages) 298 299 # Toggled to False once our connection has been established at least 300 # once 301 self.__initial_connect = True 302 303 def send(self, body, title='', notify_type=NotifyType.INFO, **kwargs): 304 """ 305 Perform MQTT Notification 306 """ 307 308 if len(self.topics) == 0: 309 # There were no services to notify 310 self.logger.warning('There were no MQTT topics to notify.') 311 return False 312 313 # For logging: 314 url = '{host}:{port}'.format(host=self.host, port=self.port) 315 316 try: 317 if self.__initial_connect: 318 # Our initial connection 319 if self.user: 320 self.client.username_pw_set( 321 self.user, password=self.password) 322 323 if self.secure: 324 if self.ca_certs is None: 325 self.logger.warning( 326 'MQTT Secure comunication can not be verified; ' 327 'no local CA certificate file') 328 return False 329 330 self.client.tls_set( 331 ca_certs=self.ca_certs, certfile=None, keyfile=None, 332 cert_reqs=ssl.CERT_REQUIRED, 333 tls_version=ssl.PROTOCOL_TLS, 334 ciphers=None) 335 336 # Set our TLS Verify Flag 337 self.client.tls_insecure_set(self.verify_certificate) 338 339 # Establish our connection 340 if self.client.connect( 341 self.host, port=self.port, 342 keepalive=self.mqtt_keepalive) \ 343 != mqtt.MQTT_ERR_SUCCESS: 344 self.logger.warning( 345 'An MQTT connection could not be established for {}'. 346 format(url)) 347 return False 348 349 # Start our client loop 350 self.client.loop_start() 351 352 # Throttle our start otherwise the starting handshaking doesnt 353 # work. I'm not sure if this is a bug or not, but with qos=0, 354 # and without this sleep(), the messages randomly fails to be 355 # delivered. 356 sleep(0.01) 357 358 # Toggle our flag since we never need to enter this area again 359 self.__initial_connect = False 360 361 # Create a copy of the subreddits list 362 topics = list(self.topics) 363 364 has_error = False 365 while len(topics) > 0 and not has_error: 366 # Retrieve our subreddit 367 topic = topics.pop() 368 369 # For logging: 370 url = '{host}:{port}/{topic}'.format( 371 host=self.host, 372 port=self.port, 373 topic=topic) 374 375 # Always call throttle before any remote server i/o is made 376 self.throttle() 377 378 # handle a re-connection 379 if not self.client.is_connected() and \ 380 self.client.reconnect() != mqtt.MQTT_ERR_SUCCESS: 381 self.logger.warning( 382 'An MQTT connection could not be sustained for {}'. 383 format(url)) 384 has_error = True 385 break 386 387 # Some Debug Logging 388 self.logger.debug('MQTT POST URL: {} (cert_verify={})'.format( 389 url, self.verify_certificate)) 390 self.logger.debug('MQTT Payload: %s' % str(body)) 391 392 result = self.client.publish( 393 topic, payload=body, qos=self.qos, retain=False) 394 395 if result.rc != mqtt.MQTT_ERR_SUCCESS: 396 # Toggle our status 397 self.logger.warning( 398 'An error (rc={}) occured when sending MQTT to {}'. 399 format(result.rc, url)) 400 has_error = True 401 break 402 403 elif not result.is_published(): 404 self.logger.debug( 405 'Blocking until MQTT payload is published...') 406 reference = datetime.now() 407 while not has_error and not result.is_published(): 408 # Throttle 409 sleep(self.mqtt_block_time_sec) 410 411 # Our own throttle so we can abort eventually.... 412 elapsed = (datetime.now() - reference).total_seconds() 413 if elapsed >= self.socket_read_timeout: 414 self.logger.warning( 415 'The MQTT message could not be delivered') 416 has_error = True 417 418 # if we reach here; we're at the bottom of our loop 419 # we loop around and do the next topic now 420 421 except ConnectionError as e: 422 self.logger.warning( 423 'MQTT Connection Error received from {}'.format(url)) 424 self.logger.debug('Socket Exception: %s' % str(e)) 425 return False 426 427 except ssl.CertificateError as e: 428 self.logger.warning( 429 'MQTT SSL Certificate Error received from {}'.format(url)) 430 self.logger.debug('Socket Exception: %s' % str(e)) 431 return False 432 433 except ValueError as e: 434 # ValueError's are thrown from publish() call if there is a problem 435 self.logger.warning( 436 'MQTT Publishing error received: from {}'.format(url)) 437 self.logger.debug('Socket Exception: %s' % str(e)) 438 return False 439 440 return not has_error 441 442 def url(self, privacy=False, *args, **kwargs): 443 """ 444 Returns the URL built dynamically based on specified arguments. 445 """ 446 447 # Define any URL parameters 448 params = { 449 'version': self.version, 450 'qos': str(self.qos), 451 'session': 'yes' if self.session else 'no', 452 } 453 454 if self.client_id: 455 # Our client id is set if specified 456 params['client_id'] = self.client_id 457 458 # Extend our parameters 459 params.update(self.url_parameters(privacy=privacy, *args, **kwargs)) 460 461 # Determine Authentication 462 auth = '' 463 if self.user and self.password: 464 auth = '{user}:{password}@'.format( 465 user=NotifyMQTT.quote(self.user, safe=''), 466 password=self.pprint( 467 self.password, privacy, mode=PrivacyMode.Secret, safe=''), 468 ) 469 elif self.user: 470 auth = '{user}@'.format( 471 user=NotifyMQTT.quote(self.user, safe=''), 472 ) 473 474 default_port = self.mqtt_secure_port \ 475 if self.secure else self.mqtt_insecure_port 476 477 return '{schema}://{auth}{hostname}{port}/{targets}?{params}'.format( 478 schema=self.secure_protocol if self.secure else self.protocol, 479 auth=auth, 480 # never encode hostname since we're expecting it to be a valid one 481 hostname=self.host, 482 port='' if self.port is None or self.port == default_port 483 else ':{}'.format(self.port), 484 targets=','.join( 485 [NotifyMQTT.quote(x, safe='/') for x in self.topics]), 486 params=NotifyMQTT.urlencode(params), 487 ) 488 489 @staticmethod 490 def parse_url(url): 491 """ 492 There are no parameters nessisary for this protocol; simply having 493 windows:// is all you need. This function just makes sure that 494 is in place. 495 496 """ 497 498 results = NotifyBase.parse_url(url) 499 if not results: 500 # We're done early as we couldn't load the results 501 return results 502 503 try: 504 # Acquire topic(s) 505 results['targets'] = parse_list( 506 NotifyMQTT.unquote(results['fullpath'].lstrip('/'))) 507 508 except AttributeError: 509 # No 'fullpath' specified 510 results['targets'] = [] 511 512 # The MQTT protocol version to use 513 if 'version' in results['qsd'] and len(results['qsd']['version']): 514 results['version'] = \ 515 NotifyMQTT.unquote(results['qsd']['version']) 516 517 # The MQTT Client ID 518 if 'client_id' in results['qsd'] and len(results['qsd']['client_id']): 519 results['client_id'] = \ 520 NotifyMQTT.unquote(results['qsd']['client_id']) 521 522 if 'session' in results['qsd'] and len(results['qsd']['session']): 523 results['session'] = parse_bool(results['qsd']['session']) 524 525 # The MQTT Quality of Service to use 526 if 'qos' in results['qsd'] and len(results['qsd']['qos']): 527 results['qos'] = \ 528 NotifyMQTT.unquote(results['qsd']['qos']) 529 530 # The 'to' makes it easier to use yaml configuration 531 if 'to' in results['qsd'] and len(results['qsd']['to']): 532 results['targets'].extend( 533 NotifyMQTT.parse_list(results['qsd']['to'])) 534 535 # return results 536 return results 537