1# -*- coding: utf-8 -*-
2#
3# Copyright (C) 2021 Chris Caron <lead2gold@gmail.com>
4# All rights reserved.
5#
6# This code is licensed under the MIT License.
7#
8# Permission is hereby granted, free of charge, to any person obtaining a copy
9# of this software and associated documentation files(the "Software"), to deal
10# in the Software without restriction, including without limitation the rights
11# to use, copy, modify, merge, publish, distribute, sublicense, and / or sell
12# copies of the Software, and to permit persons to whom the Software is
13# furnished to do so, subject to the following conditions :
14#
15# The above copyright notice and this permission notice shall be included in
16# all copies or substantial portions of the Software.
17#
18# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
19# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
20# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.IN NO EVENT SHALL THE
21# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
22# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
23# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
24# THE SOFTWARE.
25
26# PAHO MQTT Documentation:
27#  https://www.eclipse.org/paho/index.php?page=clients/python/docs/index.php
28#
29# Looking at the PAHO MQTT Source can help shed light on what's going on too
30# as their inline documentation is pretty good!
31#   https://github.com/eclipse/paho.mqtt.python\
32#           /blob/master/src/paho/mqtt/client.py
33import ssl
34import re
35import six
36from time import sleep
37from datetime import datetime
38from os.path import isfile
39from .NotifyBase import NotifyBase
40from ..URLBase import PrivacyMode
41from ..common import NotifyType
42from ..utils import parse_list
43from ..utils import parse_bool
44from ..AppriseLocale import gettext_lazy as _
45
46# Default our global support flag
47NOTIFY_MQTT_SUPPORT_ENABLED = False
48
49if six.PY2:
50    # handle Python v2.7 suport
51    class ConnectionError(Exception):
52        pass
53
54try:
55    # 3rd party modules
56    import paho.mqtt.client as mqtt
57
58    # We're good to go!
59    NOTIFY_MQTT_SUPPORT_ENABLED = True
60
61    MQTT_PROTOCOL_MAP = {
62        # v3.1.1
63        "311": mqtt.MQTTv311,
64        # v3.1
65        "31": mqtt.MQTTv31,
66        # v5.0
67        "5": mqtt.MQTTv5,
68        # v5.0 (alias)
69        "50": mqtt.MQTTv5,
70    }
71
72except ImportError:
73    # No problem; we just simply can't support this plugin because we're
74    # either using Linux, or simply do not have pywin32 installed.
75    MQTT_PROTOCOL_MAP = {}
76
77# A lookup map for relaying version to user
78HUMAN_MQTT_PROTOCOL_MAP = {
79    "v3.1.1": "311",
80    "v3.1": "31",
81    "v5.0": "5",
82}
83
84
85class NotifyMQTT(NotifyBase):
86    """
87    A wrapper for MQTT Notifications
88    """
89
90    # Set our global enabled flag
91    enabled = NOTIFY_MQTT_SUPPORT_ENABLED
92
93    requirements = {
94        # Define our required packaging in order to work
95        'packages_required': 'paho-mqtt'
96    }
97
98    # The default descriptive name associated with the Notification
99    service_name = 'MQTT Notification'
100
101    # The default protocol
102    protocol = 'mqtt'
103
104    # Secure protocol
105    secure_protocol = 'mqtts'
106
107    # A URL that takes you to the setup/help of the specific protocol
108    setup_url = 'https://github.com/caronc/apprise/wiki/Notify_mqtt'
109
110    # MQTT does not have a title
111    title_maxlen = 0
112
113    # The maximum length a body can be set to
114    body_maxlen = 268435455
115
116    # Use a throttle; but it doesn't need to be so strict since most
117    # MQTT server hostings can handle the small bursts of packets and are
118    # locally hosted anyway
119    request_rate_per_sec = 0.5
120
121    # Port Defaults (unless otherwise specified)
122    mqtt_insecure_port = 1883
123
124    # The default secure port to use (if mqtts://)
125    mqtt_secure_port = 8883
126
127    # The default mqtt keepalive value
128    mqtt_keepalive = 30
129
130    # The default mqtt transport
131    mqtt_transport = "tcp"
132
133    # The number of seconds to wait for a publish to occur at before
134    # checking to see if it's been sent yet.
135    mqtt_block_time_sec = 0.2
136
137    # Set the maximum number of messages with QoS>0 that can be part way
138    # through their network flow at once.
139    mqtt_inflight_messages = 200
140
141    # Taken from https://golang.org/src/crypto/x509/root_linux.go
142    CA_CERTIFICATE_FILE_LOCATIONS = [
143        # Debian/Ubuntu/Gentoo etc.
144        "/etc/ssl/certs/ca-certificates.crt",
145        # Fedora/RHEL 6
146        "/etc/pki/tls/certs/ca-bundle.crt",
147        # OpenSUSE
148        "/etc/ssl/ca-bundle.pem",
149        # OpenELEC
150        "/etc/pki/tls/cacert.pem",
151        # CentOS/RHEL 7
152        "/etc/pki/ca-trust/extracted/pem/tls-ca-bundle.pem",
153    ]
154
155    # Define object templates
156    templates = (
157        '{schema}://{user}@{host}/{topic}',
158        '{schema}://{user}@{host}:{port}/{topic}',
159        '{schema}://{user}:{password}@{host}/{topic}',
160        '{schema}://{user}:{password}@{host}:{port}/{topic}',
161    )
162
163    template_tokens = dict(NotifyBase.template_tokens, **{
164        'host': {
165            'name': _('Hostname'),
166            'type': 'string',
167            'required': True,
168        },
169        'port': {
170            'name': _('Port'),
171            'type': 'int',
172            'min': 1,
173            'max': 65535,
174        },
175        'user': {
176            'name': _('User Name'),
177            'type': 'string',
178            'required': True,
179        },
180        'password': {
181            'name': _('Password'),
182            'type': 'string',
183            'private': True,
184            'required': True,
185        },
186        'topic': {
187            'name': _('Target Queue'),
188            'type': 'string',
189            'map_to': 'targets',
190        },
191        'targets': {
192            'name': _('Targets'),
193            'type': 'list:string',
194        },
195    })
196
197    # Define our template arguments
198    template_args = dict(NotifyBase.template_args, **{
199        'to': {
200            'alias_of': 'targets',
201        },
202        'qos': {
203            'name': _('QOS'),
204            'type': 'int',
205            'default': 0,
206            'min': 0,
207            'max': 2,
208        },
209        'version': {
210            'name': _('Version'),
211            'type': 'choice:string',
212            'values': HUMAN_MQTT_PROTOCOL_MAP,
213            'default': "v3.1.1",
214        },
215        'client_id': {
216            'name': _('Client ID'),
217            'type': 'string',
218        },
219        'session': {
220            'name': _('Use Session'),
221            'type': 'bool',
222            'default': False,
223        },
224    })
225
226    def __init__(self, targets=None, version=None, qos=None,
227                 client_id=None, session=None, **kwargs):
228        """
229        Initialize MQTT Object
230        """
231
232        super(NotifyMQTT, self).__init__(**kwargs)
233
234        # Initialize topics
235        self.topics = parse_list(targets)
236
237        if version is None:
238            self.version = self.template_args['version']['default']
239        else:
240            self.version = version
241
242        # Save our client id if specified
243        self.client_id = client_id
244
245        # Maintain our session (associated with our user id if set)
246        self.session = self.template_args['session']['default'] \
247            if session is None or not self.client_id \
248            else parse_bool(session)
249
250        # Set up our Quality of Service (QoS)
251        try:
252            self.qos = self.template_args['qos']['default'] \
253                if qos is None else int(qos)
254
255            if self.qos < self.template_args['qos']['min'] \
256                    or self.qos > self.template_args['qos']['max']:
257                # Let error get handle on exceptio higher up
258                raise ValueError("")
259
260        except (ValueError, TypeError):
261            msg = 'An invalid MQTT QOS ({}) was specified.'.format(qos)
262            self.logger.warning(msg)
263            raise TypeError(msg)
264
265        if not self.port:
266            # Assign port (if not otherwise set)
267            self.port = self.mqtt_secure_port \
268                if self.secure else self.mqtt_insecure_port
269
270        self.ca_certs = None
271        if self.secure:
272            # verify SSL key or abort
273            self.ca_certs = next(
274                (cert for cert in self.CA_CERTIFICATE_FILE_LOCATIONS
275                 if isfile(cert)), None)
276
277        # Set up our MQTT Publisher
278        try:
279            # Get our protocol
280            self.mqtt_protocol = \
281                MQTT_PROTOCOL_MAP[re.sub(r'[^0-9]+', '', self.version)]
282
283        except (KeyError):
284            msg = 'An invalid MQTT Protocol version ' \
285                '({}) was specified.'.format(version)
286            self.logger.warning(msg)
287            raise TypeError(msg)
288
289        # Our MQTT Client Object
290        self.client = mqtt.Client(
291            client_id=self.client_id,
292            clean_session=not self.session, userdata=None,
293            protocol=self.mqtt_protocol, transport=self.mqtt_transport,
294        )
295
296        # Our maximum number of in-flight messages
297        self.client.max_inflight_messages_set(self.mqtt_inflight_messages)
298
299        # Toggled to False once our connection has been established at least
300        # once
301        self.__initial_connect = True
302
303    def send(self, body, title='', notify_type=NotifyType.INFO, **kwargs):
304        """
305        Perform MQTT Notification
306        """
307
308        if len(self.topics) == 0:
309            # There were no services to notify
310            self.logger.warning('There were no MQTT topics to notify.')
311            return False
312
313        # For logging:
314        url = '{host}:{port}'.format(host=self.host, port=self.port)
315
316        try:
317            if self.__initial_connect:
318                # Our initial connection
319                if self.user:
320                    self.client.username_pw_set(
321                        self.user, password=self.password)
322
323                if self.secure:
324                    if self.ca_certs is None:
325                        self.logger.warning(
326                            'MQTT Secure comunication can not be verified; '
327                            'no local CA certificate file')
328                        return False
329
330                    self.client.tls_set(
331                        ca_certs=self.ca_certs, certfile=None, keyfile=None,
332                        cert_reqs=ssl.CERT_REQUIRED,
333                        tls_version=ssl.PROTOCOL_TLS,
334                        ciphers=None)
335
336                    # Set our TLS Verify Flag
337                    self.client.tls_insecure_set(self.verify_certificate)
338
339                # Establish our connection
340                if self.client.connect(
341                        self.host, port=self.port,
342                        keepalive=self.mqtt_keepalive) \
343                        != mqtt.MQTT_ERR_SUCCESS:
344                    self.logger.warning(
345                        'An MQTT connection could not be established for {}'.
346                        format(url))
347                    return False
348
349                # Start our client loop
350                self.client.loop_start()
351
352                # Throttle our start otherwise the starting handshaking doesnt
353                # work. I'm not sure if this is a bug or not, but with qos=0,
354                # and without this sleep(), the messages randomly fails to be
355                # delivered.
356                sleep(0.01)
357
358                # Toggle our flag since we never need to enter this area again
359                self.__initial_connect = False
360
361            # Create a copy of the subreddits list
362            topics = list(self.topics)
363
364            has_error = False
365            while len(topics) > 0 and not has_error:
366                # Retrieve our subreddit
367                topic = topics.pop()
368
369                # For logging:
370                url = '{host}:{port}/{topic}'.format(
371                    host=self.host,
372                    port=self.port,
373                    topic=topic)
374
375                # Always call throttle before any remote server i/o is made
376                self.throttle()
377
378                # handle a re-connection
379                if not self.client.is_connected() and \
380                        self.client.reconnect() != mqtt.MQTT_ERR_SUCCESS:
381                    self.logger.warning(
382                        'An MQTT connection could not be sustained for {}'.
383                        format(url))
384                    has_error = True
385                    break
386
387                # Some Debug Logging
388                self.logger.debug('MQTT POST URL: {} (cert_verify={})'.format(
389                    url, self.verify_certificate))
390                self.logger.debug('MQTT Payload: %s' % str(body))
391
392                result = self.client.publish(
393                    topic, payload=body, qos=self.qos, retain=False)
394
395                if result.rc != mqtt.MQTT_ERR_SUCCESS:
396                    # Toggle our status
397                    self.logger.warning(
398                        'An error (rc={}) occured when sending MQTT to {}'.
399                        format(result.rc, url))
400                    has_error = True
401                    break
402
403                elif not result.is_published():
404                    self.logger.debug(
405                        'Blocking until MQTT payload is published...')
406                    reference = datetime.now()
407                    while not has_error and not result.is_published():
408                        # Throttle
409                        sleep(self.mqtt_block_time_sec)
410
411                        # Our own throttle so we can abort eventually....
412                        elapsed = (datetime.now() - reference).total_seconds()
413                        if elapsed >= self.socket_read_timeout:
414                            self.logger.warning(
415                                'The MQTT message could not be delivered')
416                            has_error = True
417
418                # if we reach here; we're at the bottom of our loop
419                # we loop around and do the next topic now
420
421        except ConnectionError as e:
422            self.logger.warning(
423                'MQTT Connection Error received from {}'.format(url))
424            self.logger.debug('Socket Exception: %s' % str(e))
425            return False
426
427        except ssl.CertificateError as e:
428            self.logger.warning(
429                'MQTT SSL Certificate Error received from {}'.format(url))
430            self.logger.debug('Socket Exception: %s' % str(e))
431            return False
432
433        except ValueError as e:
434            # ValueError's are thrown from publish() call if there is a problem
435            self.logger.warning(
436                'MQTT Publishing error received: from {}'.format(url))
437            self.logger.debug('Socket Exception: %s' % str(e))
438            return False
439
440        return not has_error
441
442    def url(self, privacy=False, *args, **kwargs):
443        """
444        Returns the URL built dynamically based on specified arguments.
445        """
446
447        # Define any URL parameters
448        params = {
449            'version': self.version,
450            'qos': str(self.qos),
451            'session': 'yes' if self.session else 'no',
452        }
453
454        if self.client_id:
455            # Our client id is set if specified
456            params['client_id'] = self.client_id
457
458        # Extend our parameters
459        params.update(self.url_parameters(privacy=privacy, *args, **kwargs))
460
461        # Determine Authentication
462        auth = ''
463        if self.user and self.password:
464            auth = '{user}:{password}@'.format(
465                user=NotifyMQTT.quote(self.user, safe=''),
466                password=self.pprint(
467                    self.password, privacy, mode=PrivacyMode.Secret, safe=''),
468            )
469        elif self.user:
470            auth = '{user}@'.format(
471                user=NotifyMQTT.quote(self.user, safe=''),
472            )
473
474        default_port = self.mqtt_secure_port \
475            if self.secure else self.mqtt_insecure_port
476
477        return '{schema}://{auth}{hostname}{port}/{targets}?{params}'.format(
478            schema=self.secure_protocol if self.secure else self.protocol,
479            auth=auth,
480            # never encode hostname since we're expecting it to be a valid one
481            hostname=self.host,
482            port='' if self.port is None or self.port == default_port
483                 else ':{}'.format(self.port),
484            targets=','.join(
485                [NotifyMQTT.quote(x, safe='/') for x in self.topics]),
486            params=NotifyMQTT.urlencode(params),
487        )
488
489    @staticmethod
490    def parse_url(url):
491        """
492        There are no parameters nessisary for this protocol; simply having
493        windows:// is all you need.  This function just makes sure that
494        is in place.
495
496        """
497
498        results = NotifyBase.parse_url(url)
499        if not results:
500            # We're done early as we couldn't load the results
501            return results
502
503        try:
504            # Acquire topic(s)
505            results['targets'] = parse_list(
506                NotifyMQTT.unquote(results['fullpath'].lstrip('/')))
507
508        except AttributeError:
509            # No 'fullpath' specified
510            results['targets'] = []
511
512        # The MQTT protocol version to use
513        if 'version' in results['qsd'] and len(results['qsd']['version']):
514            results['version'] = \
515                NotifyMQTT.unquote(results['qsd']['version'])
516
517        # The MQTT Client ID
518        if 'client_id' in results['qsd'] and len(results['qsd']['client_id']):
519            results['client_id'] = \
520                NotifyMQTT.unquote(results['qsd']['client_id'])
521
522        if 'session' in results['qsd'] and len(results['qsd']['session']):
523            results['session'] = parse_bool(results['qsd']['session'])
524
525        # The MQTT Quality of Service to use
526        if 'qos' in results['qsd'] and len(results['qsd']['qos']):
527            results['qos'] = \
528                NotifyMQTT.unquote(results['qsd']['qos'])
529
530        # The 'to' makes it easier to use yaml configuration
531        if 'to' in results['qsd'] and len(results['qsd']['to']):
532            results['targets'].extend(
533                NotifyMQTT.parse_list(results['qsd']['to']))
534
535        # return results
536        return results
537