1# -*- coding: utf-8 -*- 2# (c) 2014-2021 The mqttwarn developers 3from builtins import object 4from past.builtins import cmp 5from builtins import chr 6from builtins import str 7import os 8import sys 9import time 10import socket 11import logging 12import threading 13try: 14 from queue import Queue 15except ImportError: 16 # Backward-compatibility for Python 2 17 from Queue import Queue 18from datetime import datetime 19from pkg_resources import resource_filename 20 21import paho.mqtt.client as paho 22 23from mqttwarn.context import RuntimeContext, FunctionInvoker 24from mqttwarn.cron import PeriodicThread 25from mqttwarn.util import \ 26 load_function, load_module_from_file, load_module_by_name, timeout, \ 27 parse_cron_options, sanitize_function_name, Struct, Formatter, asbool, exception_traceback 28 29try: 30 import json 31except ImportError: 32 import simplejson as json 33 34 35 36HAVE_JINJA = True 37try: 38 from jinja2 import Environment, FileSystemLoader 39 jenv = Environment( 40 loader = FileSystemLoader('templates/', encoding='utf-8'), 41 trim_blocks = True) 42 jenv.filters['jsonify'] = json.dumps 43except ImportError: 44 HAVE_JINJA = False 45 46 47logger = logging.getLogger(__name__) 48 49 50# lwt values - may make these configurable later? 51LWTALIVE = "1" 52LWTDEAD = "0" 53 54 55# Name of calling program 56SCRIPTNAME = 'mqttwarn' 57 58# Global runtime context object 59context = None 60 61# Global configuration object 62cf = None 63 64# Global handle to MQTT client 65mqttc = None 66 67 68# Initialize processor queue 69q_in = Queue(maxsize=0) 70exit_flag = False 71 72# Instances of PeriodicThread objects 73ptlist = {} 74 75# Instances of loaded service plugins 76service_plugins = {} 77 78 79# Class with helper functions which is passed to each plugin 80# and its global instantiation 81class Service(object): 82 def __init__(self, mqttc, logger): 83 84 # Reference to MQTT client object 85 self.mqttc = mqttc 86 87 # Reference to all mqttwarn globals, for using its machinery from plugins 88 self.mwcore = globals() 89 90 # Reference to logging object 91 self.logging = logger 92 93 # Name of self ("mqttwarn", mostly) 94 self.SCRIPTNAME = SCRIPTNAME 95 96 97def make_service(mqttc=None, name=None): 98 """ 99 Service object factory. 100 Prepare service object for plugin. 101 Inject appropriate MQTT client and logger objects. 102 103 :param mqttc: Instance of PAHO MQTT client object. 104 :param name: Name used for obtaining a logger instance. 105 :return: Service object ready for being passed to plugin instance. 106 """ 107 name = name or 'unknown' 108 logger = logging.getLogger(name) 109 service = Service(mqttc, logger) 110 return service 111 112 113class Job(object): 114 115 def __init__(self, prio, service, section, topic, payload, data, target): 116 self.prio = prio 117 self.service = service 118 self.section = section 119 self.topic = topic 120 self.payload = payload # raw payload 121 self.data = data # decoded payload 122 self.target = target 123 124 logger.debug("New `%s:%s' job: %s" % (service, target, topic)) 125 return 126 127 def __cmp__(self, other): 128 return cmp(self.prio, other.prio) 129 130 131def render_template(filename, data): 132 text = None 133 if HAVE_JINJA is True: 134 template = jenv.get_template(filename) 135 text = template.render(data) 136 137 return text 138 139 140# MQTT broker callbacks 141def on_connect(mosq, userdata, flags, result_code): 142 """ 143 Handle connections (or failures) to the broker. 144 This is called after the client has received a CONNACK message 145 from the broker in response to calling connect(). 146 147 The result_code is one of; 148 0: Success 149 1: Refused - unacceptable protocol version 150 2: Refused - identifier rejected 151 3: Refused - server unavailable 152 4: Refused - bad user name or password (MQTT v3.1 broker only) 153 5: Refused - not authorised (MQTT v3.1 broker only) 154 """ 155 if result_code == 0: 156 logger.debug("Connected to MQTT broker, subscribing to topics...") 157 if not cf.cleansession: 158 logger.debug("Cleansession==False; previous subscriptions for clientid %s remain active on broker" % cf.clientid) 159 160 subscribed = [] 161 for section in context.get_sections(): 162 topic = context.get_topic(section) 163 qos = context.get_qos(section) 164 165 if topic in subscribed: 166 continue 167 168 logger.debug("Subscribing to %s (qos=%d)" % (topic, qos)) 169 mqttc.subscribe(topic, qos) 170 subscribed.append(topic) 171 172 if cf.lwt is not None: 173 mqttc.publish(cf.lwt, LWTALIVE, qos=0, retain=True) 174 175 elif result_code == 1: 176 logger.info("Connection refused - unacceptable protocol version") 177 elif result_code == 2: 178 logger.info("Connection refused - identifier rejected") 179 elif result_code == 3: 180 logger.info("Connection refused - server unavailable") 181 elif result_code == 4: 182 logger.info("Connection refused - bad user name or password") 183 elif result_code == 5: 184 logger.info("Connection refused - not authorised") 185 else: 186 logger.warning("Connection failed - result code %d" % (result_code)) 187 188 189def on_disconnect(mosq, userdata, result_code): 190 """ 191 Handle disconnections from the broker 192 """ 193 if result_code == 0: 194 logger.info("Clean disconnection from broker") 195 else: 196 send_failover("brokerdisconnected", "Broker connection lost. Will attempt to reconnect in 5s...") 197 time.sleep(5) 198 199 200def on_message(mosq, userdata, msg): 201 """ 202 Message received from the broker 203 """ 204 205 topic = msg.topic 206 payload = msg.payload.decode('utf-8') 207 logger.debug("Message received on %s: %s" % (topic, payload)) 208 209 if msg.retain == 1: 210 if cf.skipretained: 211 logger.debug("Skipping retained message on %s" % topic) 212 return 213 214 # Try to find matching settings for this topic 215 for section in context.get_sections(): 216 # Get the topic for this section (usually the section name but optionally overridden) 217 match_topic = context.get_topic(section) 218 if paho.topic_matches_sub(match_topic, topic): 219 logger.debug("Section [%s] matches message on %s. Processing..." % (section, topic)) 220 # Check for any message filters 221 if context.is_filtered(section, topic, payload): 222 logger.debug("Filter in section [%s] has skipped message on %s" % (section, topic)) 223 continue 224 # Send the message to any targets specified 225 send_to_targets(section, topic, payload) 226# End of MQTT broker callbacks 227 228 229def send_failover(reason, message): 230 # Make sure we dump this event to the log 231 logger.warning(message) 232 # Attempt to send the message to our failover targets 233 send_to_targets('failover', reason, message) 234 235 236def send_to_targets(section, topic, payload): 237 if cf.has_section(section) == False: 238 logger.warning("Section [%s] does not exist in your INI file, skipping message on %s" % (section, topic)) 239 return 240 241 # decode raw payload into transformation data 242 data = decode_payload(section, topic, payload) 243 244 dispatcher_dict = cf.getdict(section, 'targets') 245 function_name = sanitize_function_name(context.get_config(section, 'targets')) 246 247 if function_name is not None: 248 targetlist = context.get_topic_targets(section, topic, data) 249 if not isinstance(targetlist, list): 250 targetlist_type = type(targetlist) 251 logger.error('Topic target definition by function "{function_name}" ' \ 252 'in section "{section}" is empty or incorrect. Should be a list. ' \ 253 'targetlist={targetlist}, type={targetlist_type}'.format(**locals())) 254 return 255 256 elif isinstance(dispatcher_dict, dict): 257 def get_key(item): 258 # precede a key with the number of topic levels and then use reverse alphabetic sort order 259 # '+' is after '#' in ascii table 260 # caveat: for instance space is allowed in topic name but will be less specific than '+', '#' 261 # so replace '#' with first ascii character and '+' with second ascii character 262 # http://public.dhe.ibm.com/software/dw/webservices/ws-mqtt/mqtt-v3r1.html#appendix-a 263 264 # item[0] represents topic. replace wildcard characters to ensure the right order 265 modified_topic = item[0].replace('#', chr(0x01)).replace('+', chr(0x02)) 266 levels = len(item[0].split('/')) 267 # concatenate levels with leading zeros and modified topic and return as a key 268 return "{:03d}{}".format(levels, modified_topic) 269 270 # produce a sorted list of topic/targets with longest and more specific first 271 sorted_dispatcher = sorted(list(dispatcher_dict.items()), key=get_key, reverse=True) 272 for match_topic, targets in sorted_dispatcher: 273 if paho.topic_matches_sub(match_topic, topic): 274 # hocus pocus, let targets become a list 275 targetlist = targets if isinstance(targets, list) else [targets] 276 logger.debug("Most specific match %s dispatched to %s" % (match_topic, targets)) 277 # first most specific topic matches then stops processing 278 break 279 else: 280 # Not found then no action. This could be configured intentionally. 281 logger.debug("Dispatcher definition does not contain matching topic/target pair in section [%s]" % section) 282 return 283 else: 284 targetlist = cf.getlist(section, 'targets') 285 if not isinstance(targetlist, list): 286 # if targets is neither dict nor list 287 logger.error("Target definition in section [%s] is incorrect, should be a list." % section) 288 cleanup(0) 289 return 290 291 # interpolate transformation data values into topic targets 292 # be graceful if interpolation fails, but log a meaningful message 293 targetlist_resolved = [] 294 for target in targetlist: 295 try: 296 target = target.format(**data) 297 targetlist_resolved.append(target) 298 except Exception as ex: 299 error = repr(ex) 300 logger.error('Cannot interpolate transformation data into topic target "{target}": {error}. ' \ 301 'section={section}, topic={topic}, payload={payload}, data={data}'.format(**locals())) 302 targetlist = targetlist_resolved 303 304 for t in targetlist: 305 logger.debug("Message on %s going to %s" % (topic, t)) 306 # Each target is either "service" or "service:target" 307 # If no target specified then notify ALL targets 308 service = t 309 target = None 310 311 # Check if this is for a specific target 312 if t.find(':') != -1: 313 try: 314 service, target = t.split(':', 2) 315 except: 316 logger.warning("Invalid target %s - should be 'service:target'" % (t)) 317 continue 318 319 # skip targets with invalid services 320 if not service in service_plugins: 321 logger.error("Invalid configuration: topic '%s' points to non-existing service '%s'" % (topic, service)) 322 continue 323 324 sendtos = None 325 if target is None: 326 sendtos = context.get_service_targets(service) 327 else: 328 sendtos = [target] 329 330 for sendto in sendtos: 331 job = Job(1, service, section, topic, payload, data, sendto) 332 q_in.put(job) 333 334 335def builtin_transform_data(topic, payload): 336 ''' Return a dict with initial transformation data which is made 337 available to all plugins ''' 338 339 tdata = {} 340 dt = datetime.now() 341 342 tdata['topic'] = topic 343 tdata['payload'] = payload 344 tdata['_dtepoch'] = int(time.time()) # 1392628581 345 tdata['_dtiso'] = datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%S.%fZ") # 2014-02-17T10:38:43.910691Z 346 tdata['_ltiso'] = datetime.now().isoformat() #local time in iso format 347 tdata['_dthhmm'] = dt.strftime('%H:%M') # 10:16 348 tdata['_dthhmmss'] = dt.strftime('%H:%M:%S') # hhmmss=10:16:21 349 350 return tdata 351 352 353def xform(function, orig_value, transform_data): 354 """ 355 Attempt transformation on orig_value. 356 357 - 1st. function() 358 - 2nd. inline {xxxx} 359 """ 360 361 if orig_value is None: 362 return None 363 364 res = orig_value 365 366 if function is not None: 367 function_name = sanitize_function_name(function) 368 if function_name is not None: 369 try: 370 res = context.invoker.datamap(function_name, transform_data) 371 return res 372 except Exception as e: 373 logger.warning("Cannot invoke %s(): %s" % (function_name, e)) 374 375 try: 376 res = Formatter().format(function, **transform_data) 377 except Exception as e: 378 logger.exception("Formatting message failed") 379 380 if isinstance(res, str): 381 res = res.replace("\\n", "\n") 382 383 return res 384 385 386def decode_payload(section, topic, payload): 387 """ 388 Decode message payload through transformation machinery. 389 """ 390 391 transform_data = builtin_transform_data(topic, payload) 392 393 topic_data = context.get_topic_data(section, topic) 394 if topic_data is not None and isinstance(topic_data, dict): 395 transform_data.update(topic_data) 396 397 # The dict returned is completely merged into transformation data 398 # The difference between this and `get_topic_data()' is that this 399 # function obtains the topic string as well as the payload and any 400 # existing transformation data, and it can do 'things' with all. 401 # This is the way it should originally have been, but I can no 402 # longer fix the original ... (legacy) 403 404 all_data = context.get_all_data(section, topic, transform_data) 405 if all_data is not None and isinstance(all_data, dict): 406 transform_data.update(all_data) 407 408 # Attempt to decode the payload from JSON. If it's possible, add 409 # the JSON keys into item to pass to the plugin, and create the 410 # outgoing (i.e. transformed) message. 411 try: 412 payload = payload.rstrip("\0") 413 payload_data = json.loads(payload) 414 transform_data.update(payload_data) 415 except Exception as ex: 416 logger.debug(u"Cannot decode JSON object, payload={payload}: {ex}".format(**locals())) 417 418 return transform_data 419 420 421def processor(worker_id=None): 422 """ 423 Queue runner. Pull a job from the queue, find the module in charge 424 of handling the service, and invoke the module's plugin to do so. 425 """ 426 427 while not exit_flag: 428 logger.debug('Job queue has %s items to process' % q_in.qsize()) 429 job = q_in.get() 430 431 service = job.service 432 section = job.section 433 target = job.target 434 topic = job.topic 435 436 logger.debug("Processor #%s is handling: `%s' for %s" % (worker_id, service, target)) 437 438 # Sanity checks. 439 # If service configuration or targets can not be obtained successfully, 440 # log a sensible error message, fail the job and carry on with the next job. 441 try: 442 service_config = context.get_service_config(service) 443 service_targets = context.get_service_targets(service) 444 445 if target not in service_targets: 446 error_message = "Invalid configuration: Topic '{topic}' points to " \ 447 "non-existing target '{target}' in service '{service}'".format(**locals()) 448 raise KeyError(error_message) 449 450 except Exception as ex: 451 logger.error("Cannot handle service=%s, target=%s: %s\n%s" % (service, target, ex, exception_traceback())) 452 q_in.task_done() 453 continue 454 455 item = { 456 'service' : service, 457 'section' : section, 458 'target' : target, 459 'config' : service_config, 460 'addrs' : service_targets[target], 461 'topic' : topic, 462 'payload' : job.payload, 463 'data' : None, 464 'title' : None, 465 'image' : None, 466 'message' : None, 467 'priority' : None 468 } 469 470 transform_data = job.data 471 item['data'] = dict(list(transform_data.items())) 472 473 item['title'] = xform(context.get_config(section, 'title'), SCRIPTNAME, transform_data) 474 item['image'] = xform(context.get_config(section, 'image'), '', transform_data) 475 item['message'] = xform(context.get_config(section, 'format'), job.payload, transform_data) 476 477 try: 478 item['priority'] = int(xform(context.get_config(section, 'priority'), 0, transform_data)) 479 except Exception as e: 480 item['priority'] = 0 481 logger.warning("Failed to determine the priority, defaulting to zero: %s" % e) 482 483 if HAVE_JINJA is False and context.get_config(section, 'template'): 484 logger.warning("Templating not possible because Jinja2 is not installed") 485 486 if HAVE_JINJA is True: 487 template = context.get_config(section, 'template') 488 if template is not None: 489 try: 490 text = render_template(template, transform_data) 491 if text is not None: 492 item['message'] = text 493 except Exception as e: 494 logger.warning("Cannot render `%s' template: %s" % (template, e)) 495 496 if item.get('message') is not None and len(item.get('message')) > 0: 497 st = Struct(**item) 498 notified = False 499 logger.info("Invoking service plugin for `%s'" % service) 500 try: 501 # Fire the plugin in a separate thread and kill it if it doesn't return in 10s 502 module = service_plugins[service]['module'] 503 if '.' in service: 504 service_logger_name = service 505 else: 506 service_logger_name = 'mqttwarn.services.{}'.format(service) 507 srv = make_service(mqttc=mqttc, name=service_logger_name) 508 notified = timeout(module.plugin, (srv, st)) 509 except Exception as e: 510 logger.exception("Cannot invoke service for `%s'" % service) 511 512 if not notified: 513 logger.warning("Notification of %s for `%s' FAILED or TIMED OUT" % (service, item.get('topic'))) 514 else: 515 logger.warning("Notification of %s for `%s' suppressed: text is empty" % (service, item.get('topic'))) 516 517 q_in.task_done() 518 519 logger.debug("Thread exiting...") 520 521 522def load_services(services): 523 for service in services: 524 service_plugins[service] = {} 525 526 service_config = cf.config('config:' + service) 527 if service_config is None: 528 logger.error("Service `%s' has no config section" % service) 529 sys.exit(1) 530 531 service_plugins[service]['config'] = service_config 532 533 module = cf.g('config:' + service, 'module', service) 534 535 # Load external service from file. 536 modulefile_candidates = [] 537 if module.endswith(".py"): 538 # Add two candidates: a) Use the file as given and b) treat the file as relative to 539 # the directory of the configuration file. That retains backward compatibility. 540 modulefile_candidates.append(module) 541 modulefile_candidates.append(os.path.join(cf.configuration_path, module)) 542 543 # Load external service with module specification. 544 elif '.' in module: 545 logger.debug('Trying to load service "{}" from module "{}"'.format(service, module)) 546 try: 547 service_plugins[service]['module'] = load_module_by_name(module) 548 logger.info('Successfully loaded service "{}" from module "{}"'.format(service, module)) 549 continue 550 except Exception as ex: 551 logger.exception('Unable to load service "{}" from module "{}": {}'.format(service, module, ex)) 552 553 # Load built-in service module. 554 else: 555 modulefile_candidates = [ resource_filename('mqttwarn.services', module + '.py') ] 556 557 for modulefile in modulefile_candidates: 558 if not os.path.isfile(modulefile): 559 continue 560 logger.debug('Trying to load service "{}" from file "{}"'.format(service, modulefile)) 561 try: 562 service_plugins[service]['module'] = load_module_from_file(modulefile) 563 logger.info('Successfully loaded service "{}"'.format(service)) 564 except Exception as ex: 565 logger.exception('Unable to load service "{}" from file "{}": {}'.format(service, modulefile, ex)) 566 567 568def connect(): 569 """ 570 Load service plugins, connect to the broker, launch daemon threads and listen forever 571 """ 572 573 # FIXME: Remove global variables 574 global mqttc 575 576 try: 577 services = cf.getlist('defaults', 'launch') 578 except: 579 logger.error("No services configured. Aborting") 580 sys.exit(2) 581 582 load_services(services) 583 584 # Initialize MQTT broker connection 585 mqttc = paho.Client(cf.clientid, clean_session=cf.cleansession, protocol=cf.protocol) 586 587 logger.debug("Attempting connection to MQTT broker %s:%d..." % (cf.hostname, int(cf.port))) 588 mqttc.on_connect = on_connect 589 mqttc.on_message = on_message 590 mqttc.on_disconnect = on_disconnect 591 592 # check for authentication 593 if cf.username: 594 mqttc.username_pw_set(cf.username, cf.password) 595 596 # set the lwt before connecting 597 if cf.lwt is not None: 598 logger.debug("Setting LWT to %s..." % (cf.lwt)) 599 mqttc.will_set(cf.lwt, payload=LWTDEAD, qos=0, retain=True) 600 601 # Delays will be: 3, 6, 12, 24, 30, 30, ... 602 # mqttc.reconnect_delay_set(delay=3, delay_max=30, exponential_backoff=True) 603 604 if cf.tls == True: 605 mqttc.tls_set(cf.ca_certs, cf.certfile, cf.keyfile, tls_version=cf.tls_version, ciphers=None) 606 607 if cf.tls_insecure: 608 mqttc.tls_insecure_set(True) 609 610 try: 611 mqttc.connect(cf.hostname, int(cf.port), 60) 612 613 except Exception as e: 614 logger.error("Cannot connect to MQTT broker at %s:%d: %s" % (cf.hostname, int(cf.port), e)) 615 sys.exit(2) 616 617 # Update our runtime context (used by functions etc) now we have a connected MQTT client 618 context.invoker.srv.mqttc = mqttc 619 620 # Launch worker threads to operate on queue 621 start_workers() 622 623 while not exit_flag: 624 reconnect_interval = 5 625 626 try: 627 mqttc.loop_forever() 628 except socket.error: 629 pass 630 except: 631 # FIXME: add logging with trace 632 raise 633 634 if not exit_flag: 635 logger.warning("MQTT server disconnected, trying to reconnect each %s seconds" % reconnect_interval) 636 time.sleep(reconnect_interval) 637 638 639def start_workers(): 640 641 # Launch worker threads to operate on queue 642 logger.info('Starting %s worker threads' % cf.num_workers) 643 for i in range(cf.num_workers): 644 t = threading.Thread(target=processor, kwargs={'worker_id': i}) 645 t.daemon = True 646 t.start() 647 648 # If the config file has a [cron] section, the key names therein are 649 # functions from 'myfuncs.py' which should be invoked periodically. 650 # The key's value (must be numeric!) is the period in seconds. 651 652 if cf.has_section('cron'): 653 for name, val in cf.items('cron'): 654 try: 655 func = load_function(name=name, py_mod=cf.functions) 656 cron_options = parse_cron_options(val) 657 interval = cron_options['interval'] 658 logger.debug('Scheduling function "{name}" as periodic task ' \ 659 'to run each {interval} seconds via [cron] section'.format(name=name, interval=interval)) 660 service = make_service(mqttc=mqttc, name='mqttwarn.cron') 661 ptlist[name] = PeriodicThread(callback=func, period=interval, name=name, srv=service, now=asbool(cron_options.get('now'))) 662 ptlist[name].start() 663 except AttributeError: 664 logger.error("[cron] section has function [%s] specified, but that's not defined" % name) 665 continue 666 667 668def cleanup(signum=None, frame=None): 669 """ 670 Signal handler to ensure we disconnect cleanly 671 in the event of a SIGTERM or SIGINT. 672 """ 673 for ptname in ptlist: 674 logger.debug("Cancel %s timer" % ptname) 675 ptlist[ptname].cancel() 676 677 logger.debug("Disconnecting from MQTT broker...") 678 if cf.lwt is not None: 679 mqttc.publish(cf.lwt, LWTDEAD, qos=0, retain=True) 680 mqttc.loop_stop() 681 mqttc.disconnect() 682 683 logger.info("Waiting for queue to drain") 684 q_in.join() 685 686 # Send exit signal to subsystems _after_ queue was drained 687 global exit_flag 688 exit_flag = True 689 690 logger.debug("Exiting on signal %d", signum) 691 sys.exit(signum) 692 693 694def bootstrap(config=None, scriptname=None): 695 # FIXME: Remove global variables 696 global context, cf, SCRIPTNAME 697 # NOTE: this is called before we connect to the MQTT broker, so mqttc is not initialised yet 698 invoker = FunctionInvoker(config=config, srv=make_service(mqttc=None, name='mqttwarn.context')) 699 context = RuntimeContext(config=config, invoker=invoker) 700 cf = config 701 SCRIPTNAME = scriptname 702 703 704def run_plugin(config=None, name=None, data=None): 705 """ 706 Run service plugins directly without the 707 dispatching and transformation machinery. 708 709 On the one hand, this might look like a bit of a hack. 710 On the other hand, it shows very clearly how some of 711 the innards of mqttwarn interact so it might also please 712 newcomers as a "learning the guts of mqttwarn" example. 713 714 :param config: The configuration object 715 :param name: The name of the service plugin, e.g. "log" or "file" 716 :param data: The data to be converged into an appropriate item Struct object instance 717 """ 718 719 # Bootstrap mqttwarn core 720 # TODO: Optionally run w/o configuration. 721 bootstrap(config=config) 722 723 # Load designated service plugins 724 load_services([name]) 725 if '.' in name: 726 service_logger_name = name 727 else: 728 service_logger_name = 'mqttwarn.services.{}'.format(name) 729 srv = make_service(mqttc=None, name=service_logger_name) 730 731 # Build a mimikry item instance for feeding to the service plugin 732 item = Struct(**data) 733 # TODO: Read configuration optionally from data. 734 item.config = config.config('config:' + name) 735 item.service = srv 736 item.target = 'mqttwarn' 737 item.data = {} # FIXME 738 739 # Launch plugin 740 module = service_plugins[name]['module'] 741 response = module.plugin(srv, item) 742 logger.info('Plugin response: {}'.format(response)) 743