1# -*- coding: utf-8 -*-
2# (c) 2014-2021 The mqttwarn developers
3from builtins import object
4from past.builtins import cmp
5from builtins import chr
6from builtins import str
7import os
8import sys
9import time
10import socket
11import logging
12import threading
13try:
14    from queue import Queue
15except ImportError:
16    # Backward-compatibility for Python 2
17    from Queue import Queue
18from datetime import datetime
19from pkg_resources import resource_filename
20
21import paho.mqtt.client as paho
22
23from mqttwarn.context import RuntimeContext, FunctionInvoker
24from mqttwarn.cron import PeriodicThread
25from mqttwarn.util import \
26    load_function, load_module_from_file, load_module_by_name, timeout, \
27    parse_cron_options, sanitize_function_name, Struct, Formatter, asbool, exception_traceback
28
29try:
30    import json
31except ImportError:
32    import simplejson as json
33
34
35
36HAVE_JINJA = True
37try:
38    from jinja2 import Environment, FileSystemLoader
39    jenv = Environment(
40            loader = FileSystemLoader('templates/', encoding='utf-8'),
41            trim_blocks = True)
42    jenv.filters['jsonify'] = json.dumps
43except ImportError:
44    HAVE_JINJA = False
45
46
47logger = logging.getLogger(__name__)
48
49
50# lwt values - may make these configurable later?
51LWTALIVE   = "1"
52LWTDEAD    = "0"
53
54
55# Name of calling program
56SCRIPTNAME = 'mqttwarn'
57
58# Global runtime context object
59context = None
60
61# Global configuration object
62cf = None
63
64# Global handle to MQTT client
65mqttc = None
66
67
68# Initialize processor queue
69q_in = Queue(maxsize=0)
70exit_flag = False
71
72# Instances of PeriodicThread objects
73ptlist = {}
74
75# Instances of loaded service plugins
76service_plugins = {}
77
78
79# Class with helper functions which is passed to each plugin
80# and its global instantiation
81class Service(object):
82    def __init__(self, mqttc, logger):
83
84        # Reference to MQTT client object
85        self.mqttc    = mqttc
86
87        # Reference to all mqttwarn globals, for using its machinery from plugins
88        self.mwcore   = globals()
89
90        # Reference to logging object
91        self.logging  = logger
92
93        # Name of self ("mqttwarn", mostly)
94        self.SCRIPTNAME = SCRIPTNAME
95
96
97def make_service(mqttc=None, name=None):
98    """
99    Service object factory.
100    Prepare service object for plugin.
101    Inject appropriate MQTT client and logger objects.
102
103    :param mqttc: Instance of PAHO MQTT client object.
104    :param name:  Name used for obtaining a logger instance.
105    :return:      Service object ready for being passed to plugin instance.
106    """
107    name = name or 'unknown'
108    logger = logging.getLogger(name)
109    service = Service(mqttc, logger)
110    return service
111
112
113class Job(object):
114
115    def __init__(self, prio, service, section, topic, payload, data, target):
116        self.prio       = prio
117        self.service    = service
118        self.section    = section
119        self.topic      = topic
120        self.payload    = payload       # raw payload
121        self.data       = data          # decoded payload
122        self.target     = target
123
124        logger.debug("New `%s:%s' job: %s" % (service, target, topic))
125        return
126
127    def __cmp__(self, other):
128        return cmp(self.prio, other.prio)
129
130
131def render_template(filename, data):
132    text = None
133    if HAVE_JINJA is True:
134        template = jenv.get_template(filename)
135        text = template.render(data)
136
137    return text
138
139
140# MQTT broker callbacks
141def on_connect(mosq, userdata, flags, result_code):
142    """
143    Handle connections (or failures) to the broker.
144    This is called after the client has received a CONNACK message
145    from the broker in response to calling connect().
146
147    The result_code is one of;
148    0: Success
149    1: Refused - unacceptable protocol version
150    2: Refused - identifier rejected
151    3: Refused - server unavailable
152    4: Refused - bad user name or password (MQTT v3.1 broker only)
153    5: Refused - not authorised (MQTT v3.1 broker only)
154    """
155    if result_code == 0:
156        logger.debug("Connected to MQTT broker, subscribing to topics...")
157        if not cf.cleansession:
158            logger.debug("Cleansession==False; previous subscriptions for clientid %s remain active on broker" % cf.clientid)
159
160        subscribed = []
161        for section in context.get_sections():
162            topic = context.get_topic(section)
163            qos = context.get_qos(section)
164
165            if topic in subscribed:
166                continue
167
168            logger.debug("Subscribing to %s (qos=%d)" % (topic, qos))
169            mqttc.subscribe(topic, qos)
170            subscribed.append(topic)
171
172        if cf.lwt is not None:
173            mqttc.publish(cf.lwt, LWTALIVE, qos=0, retain=True)
174
175    elif result_code == 1:
176        logger.info("Connection refused - unacceptable protocol version")
177    elif result_code == 2:
178        logger.info("Connection refused - identifier rejected")
179    elif result_code == 3:
180        logger.info("Connection refused - server unavailable")
181    elif result_code == 4:
182        logger.info("Connection refused - bad user name or password")
183    elif result_code == 5:
184        logger.info("Connection refused - not authorised")
185    else:
186        logger.warning("Connection failed - result code %d" % (result_code))
187
188
189def on_disconnect(mosq, userdata, result_code):
190    """
191    Handle disconnections from the broker
192    """
193    if result_code == 0:
194        logger.info("Clean disconnection from broker")
195    else:
196        send_failover("brokerdisconnected", "Broker connection lost. Will attempt to reconnect in 5s...")
197        time.sleep(5)
198
199
200def on_message(mosq, userdata, msg):
201    """
202    Message received from the broker
203    """
204
205    topic = msg.topic
206    payload = msg.payload.decode('utf-8')
207    logger.debug("Message received on %s: %s" % (topic, payload))
208
209    if msg.retain == 1:
210        if cf.skipretained:
211            logger.debug("Skipping retained message on %s" % topic)
212            return
213
214    # Try to find matching settings for this topic
215    for section in context.get_sections():
216        # Get the topic for this section (usually the section name but optionally overridden)
217        match_topic = context.get_topic(section)
218        if paho.topic_matches_sub(match_topic, topic):
219            logger.debug("Section [%s] matches message on %s. Processing..." % (section, topic))
220            # Check for any message filters
221            if context.is_filtered(section, topic, payload):
222                logger.debug("Filter in section [%s] has skipped message on %s" % (section, topic))
223                continue
224            # Send the message to any targets specified
225            send_to_targets(section, topic, payload)
226# End of MQTT broker callbacks
227
228
229def send_failover(reason, message):
230    # Make sure we dump this event to the log
231    logger.warning(message)
232    # Attempt to send the message to our failover targets
233    send_to_targets('failover', reason, message)
234
235
236def send_to_targets(section, topic, payload):
237    if cf.has_section(section) == False:
238        logger.warning("Section [%s] does not exist in your INI file, skipping message on %s" % (section, topic))
239        return
240
241    # decode raw payload into transformation data
242    data = decode_payload(section, topic, payload)
243
244    dispatcher_dict = cf.getdict(section, 'targets')
245    function_name = sanitize_function_name(context.get_config(section, 'targets'))
246
247    if function_name is not None:
248        targetlist = context.get_topic_targets(section, topic, data)
249        if not isinstance(targetlist, list):
250            targetlist_type = type(targetlist)
251            logger.error('Topic target definition by function "{function_name}" ' \
252                         'in section "{section}" is empty or incorrect. Should be a list. ' \
253                         'targetlist={targetlist}, type={targetlist_type}'.format(**locals()))
254            return
255
256    elif isinstance(dispatcher_dict, dict):
257        def get_key(item):
258            # precede a key with the number of topic levels and then use reverse alphabetic sort order
259            # '+' is after '#' in ascii table
260            # caveat: for instance space is allowed in topic name but will be less specific than '+', '#'
261            # so replace '#' with first ascii character and '+' with second ascii character
262            # http://public.dhe.ibm.com/software/dw/webservices/ws-mqtt/mqtt-v3r1.html#appendix-a
263
264            # item[0] represents topic. replace wildcard characters to ensure the right order
265            modified_topic = item[0].replace('#', chr(0x01)).replace('+', chr(0x02))
266            levels = len(item[0].split('/'))
267            # concatenate levels with leading zeros and modified topic and return as a key
268            return "{:03d}{}".format(levels, modified_topic)
269
270        # produce a sorted list of topic/targets with longest and more specific first
271        sorted_dispatcher = sorted(list(dispatcher_dict.items()), key=get_key, reverse=True)
272        for match_topic, targets in sorted_dispatcher:
273            if paho.topic_matches_sub(match_topic, topic):
274                # hocus pocus, let targets become a list
275                targetlist = targets if isinstance(targets, list) else [targets]
276                logger.debug("Most specific match %s dispatched to %s" % (match_topic, targets))
277                # first most specific topic matches then stops processing
278                break
279        else:
280            # Not found then no action. This could be configured intentionally.
281            logger.debug("Dispatcher definition does not contain matching topic/target pair in section [%s]" % section)
282            return
283    else:
284        targetlist = cf.getlist(section, 'targets')
285        if not isinstance(targetlist, list):
286            # if targets is neither dict nor list
287            logger.error("Target definition in section [%s] is incorrect, should be a list." % section)
288            cleanup(0)
289            return
290
291    # interpolate transformation data values into topic targets
292    # be graceful if interpolation fails, but log a meaningful message
293    targetlist_resolved = []
294    for target in targetlist:
295        try:
296            target = target.format(**data)
297            targetlist_resolved.append(target)
298        except Exception as ex:
299            error = repr(ex)
300            logger.error('Cannot interpolate transformation data into topic target "{target}": {error}. ' \
301                          'section={section}, topic={topic}, payload={payload}, data={data}'.format(**locals()))
302    targetlist = targetlist_resolved
303
304    for t in targetlist:
305        logger.debug("Message on %s going to %s" % (topic, t))
306        # Each target is either "service" or "service:target"
307        # If no target specified then notify ALL targets
308        service = t
309        target = None
310
311        # Check if this is for a specific target
312        if t.find(':') != -1:
313            try:
314                service, target = t.split(':', 2)
315            except:
316                logger.warning("Invalid target %s - should be 'service:target'" % (t))
317                continue
318
319        # skip targets with invalid services
320        if not service in service_plugins:
321            logger.error("Invalid configuration: topic '%s' points to non-existing service '%s'" % (topic, service))
322            continue
323
324        sendtos = None
325        if target is None:
326            sendtos = context.get_service_targets(service)
327        else:
328            sendtos = [target]
329
330        for sendto in sendtos:
331            job = Job(1, service, section, topic, payload, data, sendto)
332            q_in.put(job)
333
334
335def builtin_transform_data(topic, payload):
336    ''' Return a dict with initial transformation data which is made
337        available to all plugins '''
338
339    tdata = {}
340    dt = datetime.now()
341
342    tdata['topic']      = topic
343    tdata['payload']    = payload
344    tdata['_dtepoch']   = int(time.time())          # 1392628581
345    tdata['_dtiso']     = datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%S.%fZ") # 2014-02-17T10:38:43.910691Z
346    tdata['_ltiso']     = datetime.now().isoformat() #local time in iso format
347    tdata['_dthhmm']    = dt.strftime('%H:%M')      # 10:16
348    tdata['_dthhmmss']  = dt.strftime('%H:%M:%S')   # hhmmss=10:16:21
349
350    return tdata
351
352
353def xform(function, orig_value, transform_data):
354    """
355    Attempt transformation on orig_value.
356
357    - 1st. function()
358    - 2nd. inline {xxxx}
359    """
360
361    if orig_value is None:
362        return None
363
364    res = orig_value
365
366    if function is not None:
367        function_name = sanitize_function_name(function)
368        if function_name is not None:
369            try:
370                res = context.invoker.datamap(function_name, transform_data)
371                return res
372            except Exception as e:
373                logger.warning("Cannot invoke %s(): %s" % (function_name, e))
374
375        try:
376            res = Formatter().format(function, **transform_data)
377        except Exception as e:
378            logger.exception("Formatting message failed")
379
380    if isinstance(res, str):
381        res = res.replace("\\n", "\n")
382
383    return res
384
385
386def decode_payload(section, topic, payload):
387    """
388    Decode message payload through transformation machinery.
389    """
390
391    transform_data = builtin_transform_data(topic, payload)
392
393    topic_data = context.get_topic_data(section, topic)
394    if topic_data is not None and isinstance(topic_data, dict):
395        transform_data.update(topic_data)
396
397    # The dict returned is completely merged into transformation data
398    # The difference between this and `get_topic_data()' is that this
399    # function obtains the topic string as well as the payload and any
400    # existing transformation data, and it can do 'things' with all.
401    # This is the way it should originally have been, but I can no
402    # longer fix the original ... (legacy)
403
404    all_data = context.get_all_data(section, topic, transform_data)
405    if all_data is not None and isinstance(all_data, dict):
406        transform_data.update(all_data)
407
408    # Attempt to decode the payload from JSON. If it's possible, add
409    # the JSON keys into item to pass to the plugin, and create the
410    # outgoing (i.e. transformed) message.
411    try:
412        payload = payload.rstrip("\0")
413        payload_data = json.loads(payload)
414        transform_data.update(payload_data)
415    except Exception as ex:
416        logger.debug(u"Cannot decode JSON object, payload={payload}: {ex}".format(**locals()))
417
418    return transform_data
419
420
421def processor(worker_id=None):
422    """
423    Queue runner. Pull a job from the queue, find the module in charge
424    of handling the service, and invoke the module's plugin to do so.
425    """
426
427    while not exit_flag:
428        logger.debug('Job queue has %s items to process' % q_in.qsize())
429        job = q_in.get()
430
431        service = job.service
432        section = job.section
433        target  = job.target
434        topic   = job.topic
435
436        logger.debug("Processor #%s is handling: `%s' for %s" % (worker_id, service, target))
437
438        # Sanity checks.
439        # If service configuration or targets can not be obtained successfully,
440        # log a sensible error message, fail the job and carry on with the next job.
441        try:
442            service_config  = context.get_service_config(service)
443            service_targets = context.get_service_targets(service)
444
445            if target not in service_targets:
446                error_message = "Invalid configuration: Topic '{topic}' points to " \
447                                "non-existing target '{target}' in service '{service}'".format(**locals())
448                raise KeyError(error_message)
449
450        except Exception as ex:
451            logger.error("Cannot handle service=%s, target=%s: %s\n%s" % (service, target, ex, exception_traceback()))
452            q_in.task_done()
453            continue
454
455        item = {
456            'service'       : service,
457            'section'       : section,
458            'target'        : target,
459            'config'        : service_config,
460            'addrs'         : service_targets[target],
461            'topic'         : topic,
462            'payload'       : job.payload,
463            'data'          : None,
464            'title'         : None,
465            'image'         : None,
466            'message'       : None,
467            'priority'      : None
468        }
469
470        transform_data = job.data
471        item['data'] = dict(list(transform_data.items()))
472
473        item['title'] = xform(context.get_config(section, 'title'), SCRIPTNAME, transform_data)
474        item['image'] = xform(context.get_config(section, 'image'), '', transform_data)
475        item['message'] = xform(context.get_config(section, 'format'), job.payload, transform_data)
476
477        try:
478            item['priority'] = int(xform(context.get_config(section, 'priority'), 0, transform_data))
479        except Exception as e:
480            item['priority'] = 0
481            logger.warning("Failed to determine the priority, defaulting to zero: %s" % e)
482
483        if HAVE_JINJA is False and context.get_config(section, 'template'):
484            logger.warning("Templating not possible because Jinja2 is not installed")
485
486        if HAVE_JINJA is True:
487            template = context.get_config(section, 'template')
488            if template is not None:
489                try:
490                    text = render_template(template, transform_data)
491                    if text is not None:
492                        item['message'] = text
493                except Exception as e:
494                    logger.warning("Cannot render `%s' template: %s" % (template, e))
495
496        if item.get('message') is not None and len(item.get('message')) > 0:
497            st = Struct(**item)
498            notified = False
499            logger.info("Invoking service plugin for `%s'" % service)
500            try:
501                # Fire the plugin in a separate thread and kill it if it doesn't return in 10s
502                module = service_plugins[service]['module']
503                if '.' in service:
504                    service_logger_name = service
505                else:
506                    service_logger_name = 'mqttwarn.services.{}'.format(service)
507                srv = make_service(mqttc=mqttc, name=service_logger_name)
508                notified = timeout(module.plugin, (srv, st))
509            except Exception as e:
510                logger.exception("Cannot invoke service for `%s'" % service)
511
512            if not notified:
513                logger.warning("Notification of %s for `%s' FAILED or TIMED OUT" % (service, item.get('topic')))
514        else:
515            logger.warning("Notification of %s for `%s' suppressed: text is empty" % (service, item.get('topic')))
516
517        q_in.task_done()
518
519    logger.debug("Thread exiting...")
520
521
522def load_services(services):
523    for service in services:
524        service_plugins[service] = {}
525
526        service_config = cf.config('config:' + service)
527        if service_config is None:
528            logger.error("Service `%s' has no config section" % service)
529            sys.exit(1)
530
531        service_plugins[service]['config'] = service_config
532
533        module = cf.g('config:' + service, 'module', service)
534
535        # Load external service from file.
536        modulefile_candidates = []
537        if module.endswith(".py"):
538            # Add two candidates: a) Use the file as given and b) treat the file as relative to
539            # the directory of the configuration file. That retains backward compatibility.
540            modulefile_candidates.append(module)
541            modulefile_candidates.append(os.path.join(cf.configuration_path, module))
542
543        # Load external service with module specification.
544        elif '.' in module:
545            logger.debug('Trying to load service "{}" from module "{}"'.format(service, module))
546            try:
547                service_plugins[service]['module'] = load_module_by_name(module)
548                logger.info('Successfully loaded service "{}" from module "{}"'.format(service, module))
549                continue
550            except Exception as ex:
551                logger.exception('Unable to load service "{}" from module "{}": {}'.format(service, module, ex))
552
553        # Load built-in service module.
554        else:
555            modulefile_candidates = [ resource_filename('mqttwarn.services', module + '.py') ]
556
557        for modulefile in modulefile_candidates:
558            if not os.path.isfile(modulefile):
559                continue
560            logger.debug('Trying to load service "{}" from file "{}"'.format(service, modulefile))
561            try:
562                service_plugins[service]['module'] = load_module_from_file(modulefile)
563                logger.info('Successfully loaded service "{}"'.format(service))
564            except Exception as ex:
565                logger.exception('Unable to load service "{}" from file "{}": {}'.format(service, modulefile, ex))
566
567
568def connect():
569    """
570    Load service plugins, connect to the broker, launch daemon threads and listen forever
571    """
572
573    # FIXME: Remove global variables
574    global mqttc
575
576    try:
577        services = cf.getlist('defaults', 'launch')
578    except:
579        logger.error("No services configured. Aborting")
580        sys.exit(2)
581
582    load_services(services)
583
584    # Initialize MQTT broker connection
585    mqttc = paho.Client(cf.clientid, clean_session=cf.cleansession, protocol=cf.protocol)
586
587    logger.debug("Attempting connection to MQTT broker %s:%d..." % (cf.hostname, int(cf.port)))
588    mqttc.on_connect = on_connect
589    mqttc.on_message = on_message
590    mqttc.on_disconnect = on_disconnect
591
592    # check for authentication
593    if cf.username:
594        mqttc.username_pw_set(cf.username, cf.password)
595
596    # set the lwt before connecting
597    if cf.lwt is not None:
598        logger.debug("Setting LWT to %s..." % (cf.lwt))
599        mqttc.will_set(cf.lwt, payload=LWTDEAD, qos=0, retain=True)
600
601    # Delays will be: 3, 6, 12, 24, 30, 30, ...
602    # mqttc.reconnect_delay_set(delay=3, delay_max=30, exponential_backoff=True)
603
604    if cf.tls == True:
605        mqttc.tls_set(cf.ca_certs, cf.certfile, cf.keyfile, tls_version=cf.tls_version, ciphers=None)
606
607    if cf.tls_insecure:
608        mqttc.tls_insecure_set(True)
609
610    try:
611        mqttc.connect(cf.hostname, int(cf.port), 60)
612
613    except Exception as e:
614        logger.error("Cannot connect to MQTT broker at %s:%d: %s" % (cf.hostname, int(cf.port), e))
615        sys.exit(2)
616
617    # Update our runtime context (used by functions etc) now we have a connected MQTT client
618    context.invoker.srv.mqttc = mqttc
619
620    # Launch worker threads to operate on queue
621    start_workers()
622
623    while not exit_flag:
624        reconnect_interval = 5
625
626        try:
627            mqttc.loop_forever()
628        except socket.error:
629            pass
630        except:
631            # FIXME: add logging with trace
632            raise
633
634        if not exit_flag:
635            logger.warning("MQTT server disconnected, trying to reconnect each %s seconds" % reconnect_interval)
636            time.sleep(reconnect_interval)
637
638
639def start_workers():
640
641    # Launch worker threads to operate on queue
642    logger.info('Starting %s worker threads' % cf.num_workers)
643    for i in range(cf.num_workers):
644        t = threading.Thread(target=processor, kwargs={'worker_id': i})
645        t.daemon = True
646        t.start()
647
648    # If the config file has a [cron] section, the key names therein are
649    # functions from 'myfuncs.py' which should be invoked periodically.
650    # The key's value (must be numeric!) is the period in seconds.
651
652    if cf.has_section('cron'):
653        for name, val in cf.items('cron'):
654            try:
655                func = load_function(name=name, py_mod=cf.functions)
656                cron_options = parse_cron_options(val)
657                interval = cron_options['interval']
658                logger.debug('Scheduling function "{name}" as periodic task ' \
659                              'to run each {interval} seconds via [cron] section'.format(name=name, interval=interval))
660                service = make_service(mqttc=mqttc, name='mqttwarn.cron')
661                ptlist[name] = PeriodicThread(callback=func, period=interval, name=name, srv=service, now=asbool(cron_options.get('now')))
662                ptlist[name].start()
663            except AttributeError:
664                logger.error("[cron] section has function [%s] specified, but that's not defined" % name)
665                continue
666
667
668def cleanup(signum=None, frame=None):
669    """
670    Signal handler to ensure we disconnect cleanly
671    in the event of a SIGTERM or SIGINT.
672    """
673    for ptname in ptlist:
674        logger.debug("Cancel %s timer" % ptname)
675        ptlist[ptname].cancel()
676
677    logger.debug("Disconnecting from MQTT broker...")
678    if cf.lwt is not None:
679        mqttc.publish(cf.lwt, LWTDEAD, qos=0, retain=True)
680    mqttc.loop_stop()
681    mqttc.disconnect()
682
683    logger.info("Waiting for queue to drain")
684    q_in.join()
685
686    # Send exit signal to subsystems _after_ queue was drained
687    global exit_flag
688    exit_flag = True
689
690    logger.debug("Exiting on signal %d", signum)
691    sys.exit(signum)
692
693
694def bootstrap(config=None, scriptname=None):
695    # FIXME: Remove global variables
696    global context, cf, SCRIPTNAME
697    # NOTE: this is called before we connect to the MQTT broker, so mqttc is not initialised yet
698    invoker = FunctionInvoker(config=config, srv=make_service(mqttc=None, name='mqttwarn.context'))
699    context = RuntimeContext(config=config, invoker=invoker)
700    cf = config
701    SCRIPTNAME = scriptname
702
703
704def run_plugin(config=None, name=None, data=None):
705    """
706    Run service plugins directly without the
707    dispatching and transformation machinery.
708
709    On the one hand, this might look like a bit of a hack.
710    On the other hand, it shows very clearly how some of
711    the innards of mqttwarn interact so it might also please
712    newcomers as a "learning the guts of mqttwarn" example.
713
714    :param config: The configuration object
715    :param name:   The name of the service plugin, e.g. "log" or "file"
716    :param data:   The data to be converged into an appropriate item Struct object instance
717    """
718
719    # Bootstrap mqttwarn core
720    # TODO: Optionally run w/o configuration.
721    bootstrap(config=config)
722
723    # Load designated service plugins
724    load_services([name])
725    if '.' in name:
726        service_logger_name = name
727    else:
728        service_logger_name = 'mqttwarn.services.{}'.format(name)
729    srv = make_service(mqttc=None, name=service_logger_name)
730
731    # Build a mimikry item instance for feeding to the service plugin
732    item = Struct(**data)
733    # TODO: Read configuration optionally from data.
734    item.config = config.config('config:' + name)
735    item.service = srv
736    item.target = 'mqttwarn'
737    item.data = {}        # FIXME
738
739    # Launch plugin
740    module = service_plugins[name]['module']
741    response = module.plugin(srv, item)
742    logger.info('Plugin response: {}'.format(response))
743