1# Copyright 2009-2018 Oli Schacher 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14# 15 16from __future__ import print_function 17 18import os 19import re 20import sys 21 22try: 23 import configparser 24except ImportError: 25 import ConfigParser as configparser 26import datetime 27import logging 28import threading 29from fuglu.threadpool import ThreadPool 30import fuglu.procpool 31import inspect 32import traceback 33import time 34import code 35import socket 36import multiprocessing 37from fuglu.shared import default_template_values, Suspect, HAVE_BEAUTIFULSOUP, BS_VERSION 38from fuglu.connectors.smtpconnector import SMTPServer 39from fuglu.connectors.milterconnector import MilterServer 40from fuglu.connectors.ncconnector import NCServer 41from fuglu.connectors.esmtpconnector import ESMTPServer 42from fuglu.localStringEncoding import force_uString, force_bString 43from fuglu.stats import StatsThread 44from fuglu.debug import ControlServer, CrashStore 45from fuglu import FUGLU_VERSION 46from fuglu.funkyconsole import FunkyConsole 47 48 49def check_version_status(lint=False): 50 """Check our version string in DNS for known issues and warn about them 51 52 the lookup should be <7 chars of commitid>.<patch>.<minor>.<major>.versioncheck.fuglu.org 53 in case of a release version, use 'release' instead of commit id 54 55 eg, the lookup for 0.6.3 would be: 56 release.3.6.0.versioncheck.fuglu.org 57 58 DNS will return NXDOMAIN or 127.0.0.<bitmask> 59 2: generic non security related issue 60 4: low risk security issue 61 8: high risk security issue 62 """ 63 bitmaskmap = { 64 2: "there is a known (not security related) issue with this version - consider upgrading", 65 4: "there is a known low-risk security issue with this version - an upgrade is recommended", 66 8: "there is a known high-risk security issue with this version - upgrade as soon as possible!", 67 } 68 69 m = re.match( 70 r'^(?P<major>\d{1,4})\.(?P<minor>\d{1,4})\.(?P<patch>\d{1,4})(?:\-(?P<commitno>\d{1,4})\-g(?P<commitid>[a-f0-9]{7}))?$', FUGLU_VERSION) 71 if m is None: 72 logging.warn("could not parse my version string %s" % FUGLU_VERSION) 73 return 74 parts = m.groupdict() 75 if 'commitid' not in parts or parts['commitid'] is None: 76 parts['commitid'] = 'release' 77 78 lookup = "{commitid}.{patch}.{minor}.{major}.versioncheck.fuglu.org".format(**parts) 79 result = None 80 try: 81 result = socket.gethostbyname(lookup) 82 except Exception: 83 # DNS fails happen - try again next time 84 pass 85 86 if result is None: 87 return 88 89 ret = re.match(r'^127\.0\.0\.(?P<replycode>\d{1,4})$', result) 90 if ret is not None: 91 code = int(ret.groupdict()['replycode']) 92 for bitmask, message in bitmaskmap.items(): 93 if code & bitmask == bitmask: 94 logging.warn(message) 95 if lint: 96 fc = FunkyConsole() 97 print(fc.strcolor(message, "yellow")) 98 99 100class MainController(object): 101 102 """main class to startup and control the app""" 103 plugins = [] 104 prependers = [] 105 appenders = [] 106 config = None 107 108 def __init__(self, config, logQueue=None, logProcessFacQueue=None): 109 """ 110 Main controller instance 111 Note: The logQueue and logProcessFacQueue keyword args are only needed in the fuglu main process when logging 112 to files. For default logging to the screen there is not logQueue needed. 113 114 Args: 115 config (configparser.RawConfigParser()): Config file parser (file already read) 116 117 Keyword Args: 118 logQueue (multiprocessing.queue or None): Queue where to put log messages (not directly used, only by loggers as defined in logtools.client_configurer) 119 logProcessFacQueue (multiprocessing.queue or None): Queue where to put new logging configurations (logtools.logConfig objects) 120 """ 121 self.requiredvars = { 122 # main section 123 'identifier': { 124 'section': 'main', 125 'description': """identifier can be any string that helps you identifying your config file\nthis helps making sure the correct config is loaded. this identifier will be printed out when fuglu is reloading its config""", 126 'default': 'dist', 127 }, 128 129 'daemonize': { 130 'section': 'main', 131 'description': "run as a daemon? (fork)", 132 'default': "1", 133 }, 134 135 'user': { 136 'section': 'main', 137 'description': "run as user", 138 'default': "nobody", 139 }, 140 141 'group': { 142 'section': 'main', 143 'description': "run as group", 144 'default': "nobody", 145 }, 146 147 'plugindir': { 148 'section': 'main', 149 'description': "comma separated list of directories in which fuglu searches for additional plugins and their dependencies", 150 'default': "", 151 }, 152 153 'plugins': { 154 'section': 'main', 155 'description': "what SCANNER plugins do we load, comma separated", 156 'default': "archive,attachment,clamav,spamassassin", 157 }, 158 159 'prependers': { 160 'section': 'main', 161 'description': "what PREPENDER plugins do we load, comma separated", 162 'default': "debug,skip", 163 }, 164 165 'appenders': { 166 'section': 'main', 167 'description': "what APPENDER plugins do we load, comma separated\nappender plugins are plugins run after the scanning plugins\nappenders will always be run, even if a a scanner plugin decided to delete/bounce/whatever a message\n(unless a mail is deferred in which case running the appender would not make sense as it will come again)", 168 'default': "", 169 }, 170 171 'bindaddress': { 172 'section': 'main', 173 'description': "address fuglu should listen on. usually 127.0.0.1 so connections are accepted from local host only", 174 'default': "127.0.0.1", 175 }, 176 177 'incomingport': { 178 'section': 'main', 179 'description': "incoming port(s) (postfix connects here)\nyou can use multiple comma separated ports here\nf.ex. to separate incoming and outgoing mail and a special port for debugging messages\n10025: standard incoming mail\n10099: outgoing mail\n10888: debug port", 180 'default': "10025,10099,10888", 181 }, 182 183 'outgoinghost': { 184 'section': 'main', 185 'description': "outgoing hostname/ip where postfix is listening for re-injects.\nuse ${injecthost} to connect back to the IP where the incoming connection came from", 186 'default': "127.0.0.1", 187 }, 188 189 'outgoingport': { 190 'section': 'main', 191 'description': "outgoing port where postfix is listening for re-injects)", 192 'default': "10026", 193 }, 194 195 'outgoinghelo': { 196 'section': 'main', 197 'description': "#outgoing helo we should use for re-injects\nleave empty to auto-detect current hostname", 198 'default': "", 199 }, 200 201 'tempdir': { 202 'section': 'main', 203 'description': "temp dir where fuglu can store messages while scanning", 204 'default': "/tmp", 205 }, 206 207 'prependaddedheaders': { 208 'section': 'main', 209 'description': "String to prepend to added headers", 210 'default': "X-Fuglu-", 211 }, 212 213 'trashdir': { 214 'section': 'main', 215 'description': "If a plugin decides to delete a message, save a copy here\ndefault empty, eg. do not save a backup copy", 216 'default': "", 217 }, 218 219 'trashlog': { 220 'section': 'main', 221 'description': "list all deleted messages in 00-fuglutrash.log in the trashdir", 222 'default': "0", 223 }, 224 225 'disablebounces': { 226 'section': 'main', 227 'description': "if this is set to True/1/yes , no Bounces will be sent from Fuglu eg. after a blocked attachment has been detected\nThis may be used for debugging/testing to make sure fuglu can not produce backscatter", 228 'default': "0", 229 }, 230 231 'debuginfoheader': { 232 'section': 'main', 233 'description': "write debug info header to every mail", 234 'default': "0", 235 }, 236 237 'spamstatusheader': { 238 'section': 'main', 239 'description': "write a Spamstatus YES/NO header", 240 'default': "1", 241 }, 242 243 'suspectidheader': { 244 'section': 'main', 245 'description': "write suspect ID to every mail", 246 'default': "1", 247 }, 248 249 'mrtgdir': { 250 'section': 'main', 251 'description': "write mrtg statistics", 252 'default': "", 253 }, 254 255 'controlport': { 256 'section': 'main', 257 'description': "port where fuglu provides statistics etc (used by fuglu_control). Can also be a path to a unix socket", 258 'default': "/tmp/fuglu_control.sock", 259 }, 260 261 'logtemplate': { 262 'section': 'main', 263 'description': "Log pattern to use for all suspects in fuglu log. set empty string to disable logging generic suspect info. Supports the usual template variables plus: ${size}, ${spam} ${highspam}, ${modified} ${decision} ${tags} (short tags representagion) ${fulltags} full tags output, ${decision}", 264 'default': 'Suspect ${id} from=${from_address} to=${to_address} size=${size} spam=${spam} virus=${virus} modified=${modified} decision=${decision}', 265 }, 266 267 'versioncheck': { 268 'section': 'main', 269 'description': "warn about known severe problems/security issues of current version.\nNote: This performs a DNS lookup of gitrelease.patchlevel.minorversion.majorversion.versioncheck.fuglu.org on startup and fuglu --lint.\nNo other information of any kind is transmitted to outside systems.\nDisable this if you consider the DNS lookup an unwanted information leak.", 270 'default': '1', 271 }, 272 273 # performance section 274 'minthreads': { 275 'default': "2", 276 'section': 'performance', 277 'description': 'minimum scanner threads', 278 }, 279 'maxthreads': { 280 'default': "40", 281 'section': 'performance', 282 'description': 'maximum scanner threads', 283 }, 284 'backend': { 285 'default': "thread", 286 'section': 'performance', 287 'description': "Method for parallelism, either 'thread' or 'process' ", 288 }, 289 'initialprocs': { 290 'default': "0", 291 'section': 'performance', 292 'description': "Initial number of processes when backend='process'. If 0 (the default), automatically selects twice the number of available virtual cores. Despite its 'initial'-name, this number currently is not adapted automatically.", 293 }, 294 295 # spam section 296 'defaultlowspamaction': { 297 'default': "DUNNO", 298 'section': 'spam', 299 'description': """what to do with messages that plugins think are spam but not so sure ("low spam")\nin normal usage you probably never set this something other than DUNNO\nthis is a DEFAULT action, eg. anti spam plugins should take this if you didn't set \n a individual override""", 300 }, 301 302 'defaulthighspamaction': { 303 'default': "DUNNO", 304 'section': 'spam', 305 'description': """what to do with messages if a plugin is sure it is spam ("high spam") \nin after-queue mode this is probably still DUNNO or maybe DELETE for courageous people\nthis is a DEFAULT action, eg. anti spam plugins should take this if you didn't set\n a individual override """, 306 }, 307 308 # virus section 309 'defaultvirusaction': { 310 'default': "DELETE", 311 'section': 'virus', 312 'description': """#what to do with messages if a plugin detects a virus\nin after-queue mode this should probably be DELETE\nin pre-queue mode you could use REJECT\nthis is a DEFAULT action, eg. anti-virus plugins should take this if you didn't set \n a individual override""", 313 }, 314 315 # smtpconnector 316 'requeuetemplate': { 317 'default': "FUGLU REQUEUE(${id}): ${injectanswer}", 318 'section': 'smtpconnector', 319 'description': """confirmation template sent back to the connecting postfix for accepted messages""", 320 }, 321 322 # esmtpconnector 323 'queuetemplate': { 324 'default': "${injectanswer}", 325 'section': 'esmtpconnector', 326 'description': """confirmation template sent back to the connecting client for accepted messages""", 327 }, 328 'ignore_multiple_recipients': { 329 'default': "0", 330 'section': 'esmtpconnector', 331 'description': """only deliver the message to the first recipient, ignore the others. This is useful in spamtrap setups where we don't want to create duplicate deliveries.""", 332 }, 333 334 # databaseconfig 335 'dbconnectstring': { 336 'default': "", 337 'section': 'databaseconfig', 338 'description': """read runtime configuration values from a database. requires sqlalchemy to be installed""", 339 'confidential': True, 340 }, 341 342 'sql': { 343 'default': """SELECT value FROM fugluconfig WHERE `section`=:section AND `option`=:option AND `scope` IN ('$GLOBAL',CONCAT('%',:to_domain),:to_address) ORDER BY `scope` DESC""", 344 'section': 'databaseconfig', 345 'description': """sql query that returns a configuration value override. sql placeholders are ':section',':option' in addition the usual suspect filter default values like ':to_domain', ':to_address' etc\nif the statement returns more than one row/value only the first value in the first row is used""", 346 }, 347 348 # environment 349 'boundarydistance': { 350 'default': "0", 351 'section': 'environment', 352 'description': """Distance to the boundary MTA ("how many received headers should fuglu skip to determine the last untrusted host information"). Only required if plugins need to have information about the last untrusted host(SPFPlugin)""", 353 }, 354 'trustedhostsregex': { 355 'default': "", 356 'section': 'environment', 357 'description': """Optional regex that should be applied to received headers to skip trusted (local) mta helo/ip/reverse dns.\nOnly required if plugins need to have information about the last untrusted host and the message doesn't pass a fixed amount of hops to reach this system in your network""", 358 }, 359 360 # plugin alias 361 'debug': { 362 'default': "fuglu.plugins.p_debug.MessageDebugger", 363 'section': 'PluginAlias', 364 }, 365 366 'skip': { 367 'default': "fuglu.plugins.p_skipper.PluginSkipper", 368 'section': 'PluginAlias', 369 }, 370 371 'fraction': { 372 'default': "fuglu.plugins.p_fraction.PluginFraction", 373 'section': 'PluginAlias', 374 }, 375 376 'archive': { 377 'default': "fuglu.plugins.archive.ArchivePlugin", 378 'section': 'PluginAlias', 379 }, 380 381 'attachment': { 382 'default': "fuglu.plugins.attachment.FiletypePlugin", 383 'section': 'PluginAlias', 384 }, 385 386 'clamav': { 387 'default': "fuglu.plugins.clamav.ClamavPlugin", 388 'section': 'PluginAlias', 389 }, 390 391 'spamassassin': { 392 'default': "fuglu.plugins.sa.SAPlugin", 393 'section': 'PluginAlias', 394 }, 395 396 'vacation': { 397 'default': "fuglu.plugins.vacation.VacationPlugin", 398 'section': 'PluginAlias', 399 }, 400 401 'actionoverride': { 402 'default': "fuglu.plugins.actionoverride.ActionOverridePlugin", 403 'section': 'PluginAlias', 404 }, 405 406 'icap': { 407 'default': "fuglu.plugins.icap.ICAPPlugin", 408 'section': 'PluginAlias', 409 }, 410 411 'sssp': { 412 'default': "fuglu.plugins.sssp.SSSPPlugin", 413 'section': 'PluginAlias', 414 }, 415 416 'fprot': { 417 'default': "fuglu.plugins.fprot.FprotPlugin", 418 'section': 'PluginAlias', 419 }, 420 421 'scriptfilter': { 422 'default': "fuglu.plugins.script.ScriptFilter", 423 'section': 'PluginAlias', 424 }, 425 426 'dkimsign': { 427 'default': "fuglu.plugins.domainauth.DKIMSignPlugin", 428 'section': 'PluginAlias', 429 }, 430 431 'dkimverify': { 432 'default': "fuglu.plugins.domainauth.DKIMVerifyPlugin", 433 'section': 'PluginAlias', 434 }, 435 436 'spf': { 437 'default': "fuglu.plugins.domainauth.SPFPlugin", 438 'section': 'PluginAlias', 439 }, 440 } 441 442 self.config = config 443 self.servers = [] 444 self.logger = self._logger() 445 self.stayalive = True 446 self.threadpool = None 447 self.procpool = None 448 self.controlserver = None 449 self.started = datetime.datetime.now() 450 self.statsthread = None 451 self.debugconsole = False 452 self._logQueue = logQueue 453 self._logProcessFacQueue = logProcessFacQueue 454 self.configFileUpdates = None 455 self.logConfigFileUpdates = None 456 457 @property 458 def logQueue(self): 459 return self._logQueue 460 461 @property 462 def logProcessFacQueue(self): 463 return self._logProcessFacQueue 464 465 @logProcessFacQueue.setter 466 def logProcessFacQueue(self, lProc): 467 self._logProcessFacQueue = lProc 468 469 def _logger(self): 470 myclass = self.__class__.__name__ 471 loggername = "fuglu.%s" % (myclass,) 472 return logging.getLogger(loggername) 473 474 def start_connector(self, portspec): 475 port = portspec.strip() 476 protocol = 'smtp' 477 478 if port.find(':') > 0: 479 protocol, port = port.split(':') 480 481 self.logger.info("starting connector %s/%s" % (protocol, port)) 482 try: 483 port = int(port) 484 if protocol == 'smtp': 485 smtpserver = SMTPServer( 486 self, port=port, address=self.config.get('main', 'bindaddress')) 487 tr = threading.Thread(target=smtpserver.serve, args=()) 488 tr.daemon = True 489 tr.start() 490 self.servers.append(smtpserver) 491 elif protocol == 'esmtp': 492 esmtpserver = ESMTPServer( 493 self, port=port, address=self.config.get('main', 'bindaddress')) 494 tr = threading.Thread(target=esmtpserver.serve, args=()) 495 tr.daemon = True 496 tr.start() 497 self.servers.append(esmtpserver) 498 elif protocol == 'milter': 499 milterserver = MilterServer( 500 self, port=port, address=self.config.get('main', 'bindaddress')) 501 tr = threading.Thread(target=milterserver.serve, args=()) 502 tr.daemon = True 503 tr.start() 504 self.servers.append(milterserver) 505 elif protocol == 'netcat': 506 ncserver = NCServer( 507 self, port=port, address=self.config.get('main', 'bindaddress')) 508 tr = threading.Thread(target=ncserver.serve, args=()) 509 tr.daemon = True 510 tr.start() 511 self.servers.append(ncserver) 512 else: 513 self.logger.error( 514 'Unknown Interface Protocol: %s, ignoring server on port %s' % (protocol, port)) 515 except Exception as e: 516 self.logger.error( 517 "could not start connector %s/%s : %s" % (protocol, port, str(e))) 518 519 def _start_stats_thread(self): 520 self.logger.info("Init Stat Engine") 521 statsthread = StatsThread(self.config) 522 mrtg_stats_thread = threading.Thread(name='MRTG-Statswriter', target=statsthread.writestats, args=()) 523 mrtg_stats_thread.daemon = True 524 mrtg_stats_thread.start() 525 return statsthread 526 527 def _start_threadpool(self): 528 self.logger.info("Init Threadpool") 529 try: 530 minthreads = self.config.getint('performance', 'minthreads') 531 maxthreads = self.config.getint('performance', 'maxthreads') 532 except configparser.NoSectionError: 533 self.logger.warning('Performance section not configured, using default thread numbers') 534 minthreads = 1 535 maxthreads = 3 536 537 queuesize = maxthreads * 10 538 return ThreadPool(minthreads, maxthreads, queuesize) 539 540 def _start_processpool(self): 541 numprocs = self.config.getint('performance','initialprocs') 542 if numprocs < 1: 543 numprocs = multiprocessing.cpu_count() *2 544 self.logger.info("Init process pool with %s worker processes"%(numprocs)) 545 pool = fuglu.procpool.ProcManager(self._logQueue, numprocs = numprocs, config = self.config) 546 return pool 547 548 def _start_connectors(self): 549 self.logger.info("Starting interface sockets...") 550 ports = self.config.get('main', 'incomingport') 551 for port in ports.split(','): 552 self.start_connector(port) 553 554 def _start_control_server(self): 555 control = ControlServer(self, address=self.config.get( 556 'main', 'bindaddress'), port=self.config.get('main', 'controlport')) 557 ctrl_server_thread = threading.Thread( 558 name='Control server', target=control.serve, args=()) 559 ctrl_server_thread.daemon = True 560 ctrl_server_thread.start() 561 return control 562 563 def _run_main_loop(self): 564 if self.debugconsole: 565 self.run_debugconsole() 566 else: 567 if self.config.getboolean('main', 'versioncheck'): 568 # log possible issues with this version 569 check_version_status() 570 571 # mainthread dummy loop 572 while self.stayalive: 573 try: 574 time.sleep(1) 575 except KeyboardInterrupt: 576 self.stayalive = False 577 578 def startup(self): 579 self.load_extensions() 580 ok = self.load_plugins() 581 if not ok: 582 sys.stderr.write( 583 "Some plugins failed to load, please check the logs. Aborting.\n") 584 self.logger.info('Fuglu shut down after fatal error condition') 585 sys.exit(1) 586 587 self.statsthread = self._start_stats_thread() 588 backend = self.config.get('performance','backend') 589 if backend == 'process': 590 self.procpool = self._start_processpool() 591 else: # default backend is 'thread' 592 self.threadpool = self._start_threadpool() 593 594 self._start_connectors() 595 self.controlserver = self._start_control_server() 596 597 self.logger.info('Startup complete') 598 self._run_main_loop() 599 self.shutdown() 600 601 def run_debugconsole(self): 602 from fuglu.shared import DUNNO, ACCEPT, DELETE, REJECT, DEFER, Suspect 603 604 # do not import readline at the top, it will cause undesired output, for example when generating the default config 605 # http://stackoverflow.com/questions/15760712/python-readline-module-prints-escape-character-during-import 606 import readline 607 608 print("Fuglu Interactive Console started") 609 print("") 610 print("pre-defined locals:") 611 612 mc = self 613 print("mc : maincontroller") 614 615 terp = code.InteractiveConsole(locals()) 616 terp.interact("") 617 618 def run_netconsole(self, port=1337, address="0.0.0.0"): 619 """start a network console""" 620 old_stdin = sys.stdin 621 old_stdout = sys.stdout 622 old_stderr = sys.stderr 623 624 addr_f = socket.getaddrinfo(address, 0)[0][0] 625 626 serversocket = socket.socket(addr_f) 627 serversocket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) 628 serversocket.bind((address, port)) 629 serversocket.listen(1) 630 clientsocket, _ = serversocket.accept() # client socket 631 self.logger.info("Interactive python connection from %s/%s" % (address, address)) 632 633 class sw(object): # socket wrapper 634 def __init__(self, s): 635 self.s = s 636 637 def read(self, length): 638 return force_uString(self.s.recv(length)) 639 640 def write(self, st): 641 return self.s.send(force_bString(st)) 642 643 def readline(self): 644 return self.read(256) 645 sw = sw(clientsocket) 646 sys.stdin = sw 647 sys.stdout = sw 648 sys.stderr = sw 649 mc = self 650 terp = code.InteractiveConsole(locals()) 651 try: 652 terp.interact("Fuglu Python Shell - MainController available as 'mc'") 653 except Exception: 654 pass 655 self.logger.info("done talking to %s - closing interactive shell on %s/%s" % (address, address, port)) 656 sys.stdin = old_stdin 657 sys.stdout = old_stdout 658 sys.stderr = old_stderr 659 try: 660 clientsocket.close() 661 except Exception as e: 662 self.logger.warning("Failed to close shell client socket: %s" % str(e)) 663 try: 664 serversocket.close() 665 except Exception as e: 666 self.logger.warning("Failed to close shell server socket: %s" % str(e)) 667 668 def reload(self): 669 """apply config changes""" 670 self.logger.info('Applying configuration changes...') 671 672 backend = self.config.get('performance','backend') 673 674 if backend == 'thread': 675 if self.threadpool is not None: 676 minthreads = self.config.getint('performance', 'minthreads') 677 maxthreads = self.config.getint('performance', 'maxthreads') 678 679 # threadpool changes? 680 if self.threadpool.minthreads != minthreads or self.threadpool.maxthreads != maxthreads: 681 self.logger.info('Threadpool config changed, initialising new threadpool') 682 currentthreadpool = self.threadpool 683 self.threadpool = self._start_threadpool() 684 currentthreadpool.stayalive = False 685 else: 686 self.logger.info('Keep existing threadpool') 687 else: 688 self.logger.info('Create new threadpool') 689 self.threadpool = self._start_threadpool() 690 691 # stop existing procpool 692 if self.procpool is not None: 693 self.logger.info('Delete old procpool') 694 self.procpool.shutdown() 695 self.procpool = None 696 697 elif backend == 'process': 698 # start new procpool 699 currentProcPool = self.procpool 700 self.logger.info('Create new processpool') 701 self.procpool = self._start_processpool() 702 703 # stop existing procpool 704 # -> the procpool has to be recreated to take configuration changes 705 # into account (each worker process has its own controller unlike using threadpool) 706 if currentProcPool is not None: 707 self.logger.info('Delete old processpool') 708 currentProcPool.shutdown() 709 710 # stop existing threadpool 711 if self.threadpool is not None: 712 self.logger.info('Delete old threadpool') 713 self.threadpool.stayalive = False 714 self.threadpool = None 715 else: 716 self.logger.error('backend not detected -> ignoring input!') 717 718 # smtp engine changes? 719 ports = self.config.get('main', 'incomingport') 720 portspeclist = ports.split(',') 721 portlist = [] 722 723 for portspec in portspeclist: 724 if portspec.find(':') > 0: 725 (protocol, port) = portspec.split(':') 726 port = int(port) 727 else: 728 port = int(portspec) 729 portlist.append(port) 730 alreadyRunning = False 731 for serv in self.servers: 732 if serv.port == port: 733 alreadyRunning = True 734 break 735 736 if not alreadyRunning: 737 self.logger.info('start new connector at %s' % str(portspec)) 738 self.start_connector(portspec) 739 else: 740 self.logger.info('keep connector at %s' % str(portspec)) 741 742 servercopy = self.servers[:] 743 for serv in servercopy: 744 if serv.port not in portlist: 745 self.logger.info('Closing server socket on port %s' % serv.port) 746 serv.shutdown() 747 self.servers.remove(serv) 748 else: 749 self.logger.info('Keep server socket on port %s' % serv.port) 750 751 self.logger.info('Config changes applied') 752 753 def shutdown(self): 754 if self.statsthread: 755 self.statsthread.stayalive = False 756 for server in self.servers: 757 self.logger.info('Closing server socket on port %s' % server.port) 758 server.shutdown() 759 760 if self.controlserver is not None: 761 self.controlserver.shutdown() 762 763 if self.threadpool: 764 self.threadpool.stayalive = False 765 if self.procpool: 766 self.procpool.stayalive = False 767 768 self.stayalive = False 769 self.logger.info('Shutdown complete') 770 self.logger.info('Remaining threads: %s' % threading.enumerate()) 771 772 def _lint_dependencies(self, fc): 773 print(fc.strcolor('Checking dependencies...', 'magenta')) 774 try: 775 import sqlalchemy 776 print(fc.strcolor('sqlalchemy: Version %s installed' % sqlalchemy.__version__, 'green')) 777 except: 778 print(fc.strcolor('sqlalchemy: not installed', 'yellow') + 779 " Optional dependency, required if you want to enable any database lookups") 780 781 if HAVE_BEAUTIFULSOUP: 782 print( 783 fc.strcolor('BeautifulSoup: V%s installed' % BS_VERSION, 'green')) 784 else: 785 print(fc.strcolor('BeautifulSoup: not installed', 'yellow') + 786 " Optional dependency, this improves accuracy for stripped body searches in filters - not required with a default config") 787 788 try: 789 import magic 790 791 if hasattr(magic, 'open'): 792 magic_vers = "python-file/libmagic bindings (http://www.darwinsys.com/file/)" 793 print(fc.strcolor('magic: found %s' % magic_vers, 'green')) 794 elif hasattr(magic, 'from_buffer'): 795 magic_vers = "python-magic (https://github.com/ahupp/python-magic)" 796 print(fc.strcolor('magic: found %s' % magic_vers, 'green')) 797 else: 798 print(fc.strcolor('magic: unsupported version', 'yellow') + 799 " File type detection requires either the python bindings from http://www.darwinsys.com/file/ or python magic from https://github.com/ahupp/python-magic") 800 except: 801 print(fc.strcolor('magic: not installed', 'yellow') + 802 " Optional dependency, without python-file or python-magic the attachment plugin's automatic file type detection will easily be fooled") 803 804 def lint(self): 805 errors = 0 806 fc = FunkyConsole() 807 self._lint_dependencies(fc) 808 809 print(fc.strcolor('Loading extensions...', 'magenta')) 810 exts = self.load_extensions() 811 for ext in exts: 812 (name, enabled, status) = ext 813 pname = fc.strcolor(name, 'cyan') 814 if enabled: 815 penabled = fc.strcolor('enabled', 'green') 816 else: 817 penabled = fc.strcolor('disabled', 'red') 818 print("%s: %s (%s)" % (pname, penabled, status)) 819 820 print(fc.strcolor('Loading plugins...', 'magenta')) 821 if not self.load_plugins(): 822 print(fc.strcolor('At least one plugin failed to load', 'red')) 823 print(fc.strcolor('Plugin loading complete', 'magenta')) 824 825 print("Linting ", fc.strcolor("main configuration", 'cyan')) 826 if not self.checkConfig(): 827 print(fc.strcolor("ERROR", "red")) 828 else: 829 print(fc.strcolor("OK", "green")) 830 831 trashdir = self.config.get('main', 'trashdir').strip() 832 if trashdir != "" and not os.path.isdir(trashdir): 833 print(fc.strcolor("Trashdir %s does not exist" % trashdir, 'red')) 834 835 # sql config override 836 sqlconfigdbconnectstring = self.config.get('databaseconfig', 'dbconnectstring') 837 if sqlconfigdbconnectstring.strip() != '': 838 print() 839 print("Linting ", fc.strcolor("sql configuration", 'cyan')) 840 try: 841 from fuglu.extensions.sql import get_session 842 sess = get_session(sqlconfigdbconnectstring) 843 tempsuspect = Suspect( 844 'sender@example.com', 'recipient@example.com', '/dev/null') 845 sqlvars = dict( 846 section='testsection', option='testoption', scope='$GLOBAL') 847 default_template_values(tempsuspect, sqlvars) 848 sess.execute(self.config.get('databaseconfig', 'sql'), sqlvars) 849 sess.remove() 850 print(fc.strcolor("OK", 'green')) 851 except Exception as e: 852 print(fc.strcolor("Failed %s" % str(e), 'red')) 853 854 allplugins = self.plugins + self.prependers + self.appenders 855 856 for plugin in allplugins: 857 print() 858 print("Linting Plugin ", fc.strcolor(str(plugin), 'cyan'), 859 'Config section:', fc.strcolor(str(plugin.section), 'cyan')) 860 try: 861 result = plugin.lint() 862 except Exception as e: 863 CrashStore.store_exception() 864 print("ERROR: %s" % e) 865 result = False 866 867 if result: 868 print(fc.strcolor("OK", "green")) 869 else: 870 errors = errors + 1 871 print(fc.strcolor("ERROR", "red")) 872 print("%s plugins reported errors." % errors) 873 874 if self.config.getboolean('main', 'versioncheck'): 875 check_version_status(lint=True) 876 877 def propagate_defaults(self, requiredvars, config, defaultsection=None): 878 """propagate defaults from requiredvars if they are missing in config""" 879 for option, infodic in requiredvars.items(): 880 if 'section' in infodic: 881 section = infodic['section'] 882 else: 883 section = defaultsection 884 885 default = infodic['default'] 886 887 if not config.has_section(section): 888 config.add_section(section) 889 890 if not config.has_option(section, option): 891 config.set(section, option, default) 892 893 def propagate_core_defaults(self): 894 """check for missing core config options and try to fill them with defaults 895 must be called before we can do plugin loading stuff 896 """ 897 self.propagate_defaults(self.requiredvars, self.config, 'main') 898 899 def propagate_plugin_defaults(self): 900 """propagate defaults from loaded lugins""" 901 #plugins, prependers, appenders 902 allplugs = self.plugins + self.prependers + self.appenders 903 for plug in allplugs: 904 if hasattr(plug, 'requiredvars'): 905 requiredvars = getattr(plug, 'requiredvars') 906 if type(requiredvars) == dict: 907 self.propagate_defaults(requiredvars, self.config, plug.section) 908 909 def checkConfig(self): 910 """Check if all requred options are in the config file 911 Fill missing values with defaults if possible 912 """ 913 allOK = True 914 for config, infodic in self.requiredvars.items(): 915 section = infodic['section'] 916 try: 917 var = self.config.get(section, config) 918 919 if 'validator' in infodic and not infodic["validator"](var): 920 print("Validation failed for [%s] :: %s" % (section, config)) 921 allOK = False 922 923 except configparser.NoSectionError: 924 print("Missing configuration section [%s] :: %s" % (section, config)) 925 allOK = False 926 except configparser.NoOptionError: 927 print("Missing configuration value [%s] :: %s" % (section, config)) 928 allOK = False 929 return allOK 930 931 def load_extensions(self): 932 """load fuglu extensions""" 933 ret = [] 934 import fuglu.extensions 935 for extension in fuglu.extensions.__all__: 936 mod = __import__('fuglu.extensions.%s' % extension) 937 ext = getattr(mod, 'extensions') 938 fl = getattr(ext, extension) 939 enabled = getattr(fl, 'ENABLED') 940 status = getattr(fl, 'STATUS') 941 name = getattr(fl, '__name__') 942 ret.append((name, enabled, status)) 943 return ret 944 945 def get_component_by_alias(self, pluginalias): 946 """Returns the full plugin component from an alias. if this alias is not configured, return the original string""" 947 if not self.config.has_section('PluginAlias'): 948 return pluginalias 949 950 if not self.config.has_option('PluginAlias', pluginalias): 951 return pluginalias 952 953 return self.config.get('PluginAlias', pluginalias) 954 955 def load_plugins(self): 956 """load plugins defined in config""" 957 allOK = True 958 plugindirs = self.config.get('main', 'plugindir').strip().split(',') 959 for plugindir in plugindirs: 960 if os.path.isdir(plugindir): 961 self.logger.debug('Searching for additional plugins in %s' % plugindir) 962 if plugindir not in sys.path: 963 sys.path.insert(0, plugindir) 964 else: 965 self.logger.warning('Plugin directory %s not found' % plugindir) 966 967 self.logger.debug('Module search path %s' % sys.path) 968 self.logger.debug('Loading scanner plugins') 969 newplugins, loadok = self._load_all(self.config.get('main', 'plugins')) 970 if not loadok: 971 allOK = False 972 973 newprependers, loadok = self._load_all( 974 self.config.get('main', 'prependers')) 975 if not loadok: 976 allOK = False 977 978 newappenders, loadok = self._load_all( 979 self.config.get('main', 'appenders')) 980 if not loadok: 981 allOK = False 982 983 if allOK: 984 self.plugins = newplugins 985 self.prependers = newprependers 986 self.appenders = newappenders 987 self.propagate_plugin_defaults() 988 989 return allOK 990 991 def _load_all(self, configstring): 992 """load all plugins from config string. returns tuple ([list of loaded instances],allOk)""" 993 pluglist = [] 994 config_re = re.compile( 995 """^(?P<structured_name>[a-zA-Z0-9\.\_\-]+)(?:\((?P<config_override>[a-zA-Z0-9\.\_\-]+)\))?$""") 996 allOK = True 997 plugins = configstring.split(',') 998 for plug in plugins: 999 if plug == "": 1000 continue 1001 m = config_re.match(plug) 1002 if m is None: 1003 self.logger.error('Invalid Plugin Syntax: %s' % plug) 1004 allOK = False 1005 continue 1006 structured_name, configoverride = m.groups() 1007 structured_name = self.get_component_by_alias(structured_name) 1008 try: 1009 plugininstance = self._load_component( 1010 structured_name, configsection=configoverride) 1011 pluglist.append(plugininstance) 1012 except (configparser.NoSectionError, configparser.NoOptionError): 1013 CrashStore.store_exception() 1014 self.logger.error("The plugin %s is accessing the config in __init__ -> can not load default values" % structured_name) 1015 except Exception as e: 1016 CrashStore.store_exception() 1017 self.logger.error('Could not load plugin %s : %s' % 1018 (structured_name, e)) 1019 exc = traceback.format_exc() 1020 self.logger.error(exc) 1021 allOK = False 1022 1023 return pluglist, allOK 1024 1025 def _load_component(self, structured_name, configsection=None): 1026 # from: 1027 # http://mail.python.org/pipermail/python-list/2003-May/204392.html 1028 component_names = structured_name.split('.') 1029 mod = __import__('.'.join(component_names[:-1])) 1030 for component_name in component_names[1:]: 1031 mod = getattr(mod, component_name) 1032 1033 if configsection is None: 1034 plugininstance = mod(self.config) 1035 else: 1036 # check if plugin supports config override 1037 if 'section' in inspect.getargspec(mod.__init__)[0]: 1038 plugininstance = mod(self.config, section=configsection) 1039 else: 1040 raise Exception('Cannot set Config Section %s : Plugin %s does not support config override' % ( 1041 configsection, mod)) 1042 return plugininstance 1043 1044