1#   Copyright 2009-2018 Oli Schacher
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14#
15
16from __future__ import print_function
17
18import os
19import re
20import sys
21
22try:
23    import configparser
24except ImportError:
25    import ConfigParser as configparser
26import datetime
27import logging
28import threading
29from fuglu.threadpool import ThreadPool
30import fuglu.procpool
31import inspect
32import traceback
33import time
34import code
35import socket
36import multiprocessing
37from fuglu.shared import default_template_values, Suspect, HAVE_BEAUTIFULSOUP, BS_VERSION
38from fuglu.connectors.smtpconnector import SMTPServer
39from fuglu.connectors.milterconnector import MilterServer
40from fuglu.connectors.ncconnector import NCServer
41from fuglu.connectors.esmtpconnector import ESMTPServer
42from fuglu.localStringEncoding import force_uString, force_bString
43from fuglu.stats import StatsThread
44from fuglu.debug import ControlServer, CrashStore
45from fuglu import FUGLU_VERSION
46from fuglu.funkyconsole import FunkyConsole
47
48
49def check_version_status(lint=False):
50    """Check our version string in DNS for known issues and warn about them
51
52    the lookup should be <7 chars of commitid>.<patch>.<minor>.<major>.versioncheck.fuglu.org
53    in case of a release version, use 'release' instead of commit id
54
55    eg, the lookup for 0.6.3 would be:
56    release.3.6.0.versioncheck.fuglu.org
57
58    DNS will return NXDOMAIN or 127.0.0.<bitmask>
59    2: generic non security related issue
60    4: low risk security issue
61    8: high risk security issue
62    """
63    bitmaskmap = {
64        2: "there is a known (not security related) issue with this version - consider upgrading",
65        4: "there is a known low-risk security issue with this version - an upgrade is recommended",
66        8: "there is a known high-risk security issue with this version - upgrade as soon as possible!",
67    }
68
69    m = re.match(
70        r'^(?P<major>\d{1,4})\.(?P<minor>\d{1,4})\.(?P<patch>\d{1,4})(?:\-(?P<commitno>\d{1,4})\-g(?P<commitid>[a-f0-9]{7}))?$', FUGLU_VERSION)
71    if m is None:
72        logging.warn("could not parse my version string %s" % FUGLU_VERSION)
73        return
74    parts = m.groupdict()
75    if 'commitid' not in parts or parts['commitid'] is None:
76        parts['commitid'] = 'release'
77
78    lookup = "{commitid}.{patch}.{minor}.{major}.versioncheck.fuglu.org".format(**parts)
79    result = None
80    try:
81        result = socket.gethostbyname(lookup)
82    except Exception:
83        # DNS fails happen - try again next time
84        pass
85
86    if result is None:
87        return
88
89    ret = re.match(r'^127\.0\.0\.(?P<replycode>\d{1,4})$', result)
90    if ret is not None:
91        code = int(ret.groupdict()['replycode'])
92        for bitmask, message in bitmaskmap.items():
93            if code & bitmask == bitmask:
94                logging.warn(message)
95                if lint:
96                    fc = FunkyConsole()
97                    print(fc.strcolor(message, "yellow"))
98
99
100class MainController(object):
101
102    """main class to startup and control the app"""
103    plugins = []
104    prependers = []
105    appenders = []
106    config = None
107
108    def __init__(self, config, logQueue=None, logProcessFacQueue=None):
109        """
110        Main controller instance
111        Note: The logQueue and logProcessFacQueue keyword args are only needed in the fuglu main process when logging
112              to files. For default logging to the screen there is not logQueue needed.
113
114        Args:
115            config (configparser.RawConfigParser()): Config file parser (file already read)
116
117        Keyword Args:
118            logQueue (multiprocessing.queue or None): Queue where to put log messages (not directly used, only by loggers as defined in logtools.client_configurer)
119            logProcessFacQueue (multiprocessing.queue or None): Queue where to put new logging configurations (logtools.logConfig objects)
120        """
121        self.requiredvars = {
122            # main section
123            'identifier': {
124                'section': 'main',
125                'description': """identifier can be any string that helps you identifying your config file\nthis helps making sure the correct config is loaded. this identifier will be printed out when fuglu is reloading its config""",
126                'default': 'dist',
127            },
128
129            'daemonize': {
130                'section': 'main',
131                'description': "run as a daemon? (fork)",
132                'default': "1",
133            },
134
135            'user': {
136                'section': 'main',
137                'description': "run as user",
138                'default': "nobody",
139            },
140
141            'group': {
142                'section': 'main',
143                'description': "run as group",
144                'default': "nobody",
145            },
146
147            'plugindir': {
148                'section': 'main',
149                'description': "comma separated list of directories in which fuglu searches for additional plugins and their dependencies",
150                'default': "",
151            },
152
153            'plugins': {
154                'section': 'main',
155                'description': "what SCANNER plugins do we load, comma separated",
156                'default': "archive,attachment,clamav,spamassassin",
157            },
158
159            'prependers': {
160                'section': 'main',
161                'description': "what PREPENDER plugins do we load, comma separated",
162                'default': "debug,skip",
163            },
164
165            'appenders': {
166                'section': 'main',
167                'description': "what APPENDER plugins do we load, comma separated\nappender plugins are plugins run after the scanning plugins\nappenders will always be run, even if a a scanner plugin decided to delete/bounce/whatever a message\n(unless a mail is deferred in which case running the appender would not make sense as it will come again)",
168                'default': "",
169            },
170
171            'bindaddress': {
172                'section': 'main',
173                'description': "address fuglu should listen on. usually 127.0.0.1 so connections are accepted from local host only",
174                'default': "127.0.0.1",
175            },
176
177            'incomingport': {
178                'section': 'main',
179                'description': "incoming port(s) (postfix connects here)\nyou can use multiple comma separated ports here\nf.ex. to separate incoming and outgoing mail and a special port for debugging messages\n10025: standard incoming mail\n10099: outgoing mail\n10888: debug port",
180                'default': "10025,10099,10888",
181            },
182
183            'outgoinghost': {
184                'section': 'main',
185                'description': "outgoing hostname/ip where postfix is listening for re-injects.\nuse ${injecthost} to connect back to the IP where the incoming connection came from",
186                'default': "127.0.0.1",
187            },
188
189            'outgoingport': {
190                'section': 'main',
191                'description': "outgoing port  where postfix is listening for re-injects)",
192                'default': "10026",
193            },
194
195            'outgoinghelo': {
196                'section': 'main',
197                'description': "#outgoing helo we should use for re-injects\nleave empty to auto-detect current hostname",
198                'default': "",
199            },
200
201            'tempdir': {
202                'section': 'main',
203                'description': "temp dir where fuglu can store messages while scanning",
204                'default': "/tmp",
205            },
206
207            'prependaddedheaders': {
208                'section': 'main',
209                'description': "String to prepend to added headers",
210                'default': "X-Fuglu-",
211            },
212
213            'trashdir': {
214                'section': 'main',
215                'description': "If a plugin decides to delete a message, save a copy here\ndefault empty, eg. do not save a backup copy",
216                'default': "",
217            },
218
219            'trashlog': {
220                'section': 'main',
221                'description': "list all deleted messages in 00-fuglutrash.log in the trashdir",
222                'default': "0",
223            },
224
225            'disablebounces': {
226                'section': 'main',
227                'description': "if this is set to True/1/yes , no Bounces will be sent from Fuglu eg. after a blocked attachment has been detected\nThis may be used for debugging/testing to make sure fuglu can not produce backscatter",
228                'default': "0",
229            },
230
231            'debuginfoheader': {
232                'section': 'main',
233                'description': "write debug info header to every mail",
234                'default': "0",
235            },
236
237            'spamstatusheader': {
238                'section': 'main',
239                'description': "write a Spamstatus YES/NO header",
240                'default': "1",
241            },
242
243            'suspectidheader': {
244                'section': 'main',
245                'description': "write suspect ID to every mail",
246                'default': "1",
247            },
248
249            'mrtgdir': {
250                'section': 'main',
251                'description': "write mrtg statistics",
252                'default': "",
253            },
254
255            'controlport': {
256                'section': 'main',
257                'description': "port where fuglu provides statistics etc (used by fuglu_control). Can also be a path to a unix socket",
258                'default': "/tmp/fuglu_control.sock",
259            },
260
261            'logtemplate': {
262                'section': 'main',
263                'description': "Log pattern to use for all suspects in fuglu log. set empty string to disable logging generic suspect info. Supports the usual template variables plus: ${size}, ${spam} ${highspam}, ${modified} ${decision} ${tags} (short tags representagion) ${fulltags} full tags output, ${decision}",
264                'default': 'Suspect ${id} from=${from_address} to=${to_address} size=${size} spam=${spam} virus=${virus} modified=${modified} decision=${decision}',
265            },
266
267            'versioncheck': {
268                'section': 'main',
269                'description': "warn about known severe problems/security issues of current version.\nNote: This performs a DNS lookup of gitrelease.patchlevel.minorversion.majorversion.versioncheck.fuglu.org on startup and fuglu --lint.\nNo other information of any kind is transmitted to outside systems.\nDisable this if you consider the DNS lookup an unwanted information leak.",
270                'default': '1',
271            },
272
273            # performance section
274            'minthreads': {
275                'default': "2",
276                'section': 'performance',
277                'description': 'minimum scanner threads',
278            },
279            'maxthreads': {
280                'default': "40",
281                'section': 'performance',
282                'description': 'maximum scanner threads',
283            },
284            'backend': {
285                'default': "thread",
286                'section': 'performance',
287                'description': "Method for parallelism, either 'thread' or 'process' ",
288            },
289            'initialprocs': {
290                'default': "0",
291                'section': 'performance',
292                'description': "Initial number of processes when backend='process'. If 0 (the default), automatically selects twice the number of available virtual cores. Despite its 'initial'-name, this number currently is not adapted automatically.",
293            },
294
295            # spam section
296            'defaultlowspamaction': {
297                'default': "DUNNO",
298                'section': 'spam',
299                'description': """what to do with messages that plugins think are spam but  not so sure  ("low spam")\nin normal usage you probably never set this something other than DUNNO\nthis is a DEFAULT action, eg. anti spam plugins should take this if you didn't set \n a individual override""",
300            },
301
302            'defaulthighspamaction': {
303                'default': "DUNNO",
304                'section': 'spam',
305                'description': """what to do with messages if a plugin is sure it is spam ("high spam") \nin after-queue mode this is probably still DUNNO or maybe DELETE for courageous people\nthis is a DEFAULT action, eg. anti spam plugins should take this if you didn't set\n a individual override """,
306            },
307
308            # virus section
309            'defaultvirusaction': {
310                'default': "DELETE",
311                'section': 'virus',
312                'description': """#what to do with messages if a plugin detects a virus\nin after-queue mode this should probably be DELETE\nin pre-queue mode you could use REJECT\nthis is a DEFAULT action, eg. anti-virus plugins should take this if you didn't set \n a individual override""",
313            },
314
315            # smtpconnector
316            'requeuetemplate': {
317                'default': "FUGLU REQUEUE(${id}): ${injectanswer}",
318                'section': 'smtpconnector',
319                'description': """confirmation template sent back to the connecting postfix for accepted messages""",
320            },
321
322            # esmtpconnector
323            'queuetemplate': {
324                'default': "${injectanswer}",
325                'section': 'esmtpconnector',
326                'description': """confirmation template sent back to the connecting client for accepted messages""",
327            },
328            'ignore_multiple_recipients': {
329                'default': "0",
330                'section': 'esmtpconnector',
331                'description': """only deliver the message to the first recipient, ignore the others. This is useful in spamtrap setups where we don't want to create duplicate deliveries.""",
332            },
333
334            # databaseconfig
335            'dbconnectstring': {
336                'default': "",
337                'section': 'databaseconfig',
338                'description': """read runtime configuration values from a database. requires sqlalchemy to be installed""",
339                'confidential': True,
340            },
341
342            'sql': {
343                'default': """SELECT value FROM fugluconfig WHERE `section`=:section AND `option`=:option AND `scope` IN ('$GLOBAL',CONCAT('%',:to_domain),:to_address) ORDER BY `scope` DESC""",
344                'section': 'databaseconfig',
345                'description': """sql query that returns a configuration value override. sql placeholders are ':section',':option' in addition the usual suspect filter default values like ':to_domain', ':to_address' etc\nif the statement returns more than one row/value only the first value in the first row is used""",
346            },
347
348            # environment
349            'boundarydistance': {
350                'default': "0",
351                'section': 'environment',
352                'description': """Distance to the boundary MTA ("how many received headers should fuglu skip to determine the last untrusted host information"). Only required if plugins need to have information about the last untrusted host(SPFPlugin)""",
353            },
354            'trustedhostsregex': {
355                'default': "",
356                'section': 'environment',
357                'description': """Optional regex that should be applied to received headers to skip trusted (local) mta helo/ip/reverse dns.\nOnly required if plugins need to have information about the last untrusted host and the message doesn't pass a fixed amount of hops to reach this system in your network""",
358            },
359
360            #  plugin alias
361            'debug': {
362                'default': "fuglu.plugins.p_debug.MessageDebugger",
363                'section': 'PluginAlias',
364            },
365
366            'skip': {
367                'default': "fuglu.plugins.p_skipper.PluginSkipper",
368                'section': 'PluginAlias',
369            },
370
371            'fraction': {
372                'default': "fuglu.plugins.p_fraction.PluginFraction",
373                'section': 'PluginAlias',
374            },
375
376            'archive': {
377                'default': "fuglu.plugins.archive.ArchivePlugin",
378                'section': 'PluginAlias',
379            },
380
381            'attachment': {
382                'default': "fuglu.plugins.attachment.FiletypePlugin",
383                'section': 'PluginAlias',
384            },
385
386            'clamav': {
387                'default': "fuglu.plugins.clamav.ClamavPlugin",
388                'section': 'PluginAlias',
389            },
390
391            'spamassassin': {
392                'default': "fuglu.plugins.sa.SAPlugin",
393                'section': 'PluginAlias',
394            },
395
396            'vacation': {
397                'default': "fuglu.plugins.vacation.VacationPlugin",
398                'section': 'PluginAlias',
399            },
400
401            'actionoverride': {
402                'default': "fuglu.plugins.actionoverride.ActionOverridePlugin",
403                'section': 'PluginAlias',
404            },
405
406            'icap': {
407                'default': "fuglu.plugins.icap.ICAPPlugin",
408                'section': 'PluginAlias',
409            },
410
411            'sssp': {
412                'default': "fuglu.plugins.sssp.SSSPPlugin",
413                'section': 'PluginAlias',
414            },
415
416            'fprot': {
417                'default': "fuglu.plugins.fprot.FprotPlugin",
418                'section': 'PluginAlias',
419            },
420
421            'scriptfilter': {
422                'default': "fuglu.plugins.script.ScriptFilter",
423                'section': 'PluginAlias',
424            },
425
426            'dkimsign': {
427                'default': "fuglu.plugins.domainauth.DKIMSignPlugin",
428                'section': 'PluginAlias',
429            },
430
431            'dkimverify': {
432                'default': "fuglu.plugins.domainauth.DKIMVerifyPlugin",
433                'section': 'PluginAlias',
434            },
435
436            'spf': {
437                'default': "fuglu.plugins.domainauth.SPFPlugin",
438                'section': 'PluginAlias',
439            },
440        }
441
442        self.config = config
443        self.servers = []
444        self.logger = self._logger()
445        self.stayalive = True
446        self.threadpool = None
447        self.procpool = None
448        self.controlserver = None
449        self.started = datetime.datetime.now()
450        self.statsthread = None
451        self.debugconsole = False
452        self._logQueue = logQueue
453        self._logProcessFacQueue = logProcessFacQueue
454        self.configFileUpdates = None
455        self.logConfigFileUpdates = None
456
457    @property
458    def logQueue(self):
459        return self._logQueue
460
461    @property
462    def logProcessFacQueue(self):
463        return self._logProcessFacQueue
464
465    @logProcessFacQueue.setter
466    def logProcessFacQueue(self, lProc):
467        self._logProcessFacQueue = lProc
468
469    def _logger(self):
470        myclass = self.__class__.__name__
471        loggername = "fuglu.%s" % (myclass,)
472        return logging.getLogger(loggername)
473
474    def start_connector(self, portspec):
475        port = portspec.strip()
476        protocol = 'smtp'
477
478        if port.find(':') > 0:
479            protocol, port = port.split(':')
480
481        self.logger.info("starting connector %s/%s" % (protocol, port))
482        try:
483            port = int(port)
484            if protocol == 'smtp':
485                smtpserver = SMTPServer(
486                    self, port=port, address=self.config.get('main', 'bindaddress'))
487                tr = threading.Thread(target=smtpserver.serve, args=())
488                tr.daemon = True
489                tr.start()
490                self.servers.append(smtpserver)
491            elif protocol == 'esmtp':
492                esmtpserver = ESMTPServer(
493                    self, port=port, address=self.config.get('main', 'bindaddress'))
494                tr = threading.Thread(target=esmtpserver.serve, args=())
495                tr.daemon = True
496                tr.start()
497                self.servers.append(esmtpserver)
498            elif protocol == 'milter':
499                milterserver = MilterServer(
500                    self, port=port, address=self.config.get('main', 'bindaddress'))
501                tr = threading.Thread(target=milterserver.serve, args=())
502                tr.daemon = True
503                tr.start()
504                self.servers.append(milterserver)
505            elif protocol == 'netcat':
506                ncserver = NCServer(
507                    self, port=port, address=self.config.get('main', 'bindaddress'))
508                tr = threading.Thread(target=ncserver.serve, args=())
509                tr.daemon = True
510                tr.start()
511                self.servers.append(ncserver)
512            else:
513                self.logger.error(
514                    'Unknown Interface Protocol: %s, ignoring server on port %s' % (protocol, port))
515        except Exception as e:
516            self.logger.error(
517                "could not start connector %s/%s : %s" % (protocol, port, str(e)))
518
519    def _start_stats_thread(self):
520        self.logger.info("Init Stat Engine")
521        statsthread = StatsThread(self.config)
522        mrtg_stats_thread = threading.Thread(name='MRTG-Statswriter', target=statsthread.writestats, args=())
523        mrtg_stats_thread.daemon = True
524        mrtg_stats_thread.start()
525        return statsthread
526
527    def _start_threadpool(self):
528        self.logger.info("Init Threadpool")
529        try:
530            minthreads = self.config.getint('performance', 'minthreads')
531            maxthreads = self.config.getint('performance', 'maxthreads')
532        except configparser.NoSectionError:
533            self.logger.warning('Performance section not configured, using default thread numbers')
534            minthreads = 1
535            maxthreads = 3
536
537        queuesize = maxthreads * 10
538        return ThreadPool(minthreads, maxthreads, queuesize)
539
540    def _start_processpool(self):
541        numprocs = self.config.getint('performance','initialprocs')
542        if numprocs < 1:
543            numprocs = multiprocessing.cpu_count() *2
544        self.logger.info("Init process pool with %s worker processes"%(numprocs))
545        pool = fuglu.procpool.ProcManager(self._logQueue, numprocs = numprocs, config = self.config)
546        return pool
547
548    def _start_connectors(self):
549        self.logger.info("Starting interface sockets...")
550        ports = self.config.get('main', 'incomingport')
551        for port in ports.split(','):
552            self.start_connector(port)
553
554    def _start_control_server(self):
555        control = ControlServer(self, address=self.config.get(
556            'main', 'bindaddress'), port=self.config.get('main', 'controlport'))
557        ctrl_server_thread = threading.Thread(
558            name='Control server', target=control.serve, args=())
559        ctrl_server_thread.daemon = True
560        ctrl_server_thread.start()
561        return control
562
563    def _run_main_loop(self):
564        if self.debugconsole:
565            self.run_debugconsole()
566        else:
567            if self.config.getboolean('main', 'versioncheck'):
568                # log possible issues with this version
569                check_version_status()
570
571            # mainthread dummy loop
572            while self.stayalive:
573                try:
574                    time.sleep(1)
575                except KeyboardInterrupt:
576                    self.stayalive = False
577
578    def startup(self):
579        self.load_extensions()
580        ok = self.load_plugins()
581        if not ok:
582            sys.stderr.write(
583                "Some plugins failed to load, please check the logs. Aborting.\n")
584            self.logger.info('Fuglu shut down after fatal error condition')
585            sys.exit(1)
586
587        self.statsthread = self._start_stats_thread()
588        backend = self.config.get('performance','backend')
589        if backend == 'process':
590            self.procpool = self._start_processpool()
591        else: # default backend is 'thread'
592            self.threadpool = self._start_threadpool()
593
594        self._start_connectors()
595        self.controlserver = self._start_control_server()
596
597        self.logger.info('Startup complete')
598        self._run_main_loop()
599        self.shutdown()
600
601    def run_debugconsole(self):
602        from fuglu.shared import DUNNO, ACCEPT, DELETE, REJECT, DEFER, Suspect
603
604        # do not import readline at the top, it will cause undesired output, for example when generating the default config
605        # http://stackoverflow.com/questions/15760712/python-readline-module-prints-escape-character-during-import
606        import readline
607
608        print("Fuglu Interactive Console started")
609        print("")
610        print("pre-defined locals:")
611
612        mc = self
613        print("mc : maincontroller")
614
615        terp = code.InteractiveConsole(locals())
616        terp.interact("")
617
618    def run_netconsole(self, port=1337, address="0.0.0.0"):
619        """start a network console"""
620        old_stdin = sys.stdin
621        old_stdout = sys.stdout
622        old_stderr = sys.stderr
623
624        addr_f = socket.getaddrinfo(address, 0)[0][0]
625
626        serversocket = socket.socket(addr_f)
627        serversocket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
628        serversocket.bind((address, port))
629        serversocket.listen(1)
630        clientsocket, _ = serversocket.accept()  # client socket
631        self.logger.info("Interactive python connection from %s/%s" % (address, address))
632
633        class sw(object):  # socket wrapper
634            def __init__(self, s):
635                self.s = s
636
637            def read(self, length):
638                return force_uString(self.s.recv(length))
639
640            def write(self, st):
641                return self.s.send(force_bString(st))
642
643            def readline(self):
644                return self.read(256)
645        sw = sw(clientsocket)
646        sys.stdin = sw
647        sys.stdout = sw
648        sys.stderr = sw
649        mc = self
650        terp = code.InteractiveConsole(locals())
651        try:
652            terp.interact("Fuglu Python Shell - MainController available as 'mc'")
653        except Exception:
654            pass
655        self.logger.info("done talking to %s - closing interactive shell on %s/%s" % (address, address, port))
656        sys.stdin = old_stdin
657        sys.stdout = old_stdout
658        sys.stderr = old_stderr
659        try:
660            clientsocket.close()
661        except Exception as e:
662            self.logger.warning("Failed to close shell client socket: %s" % str(e))
663        try:
664            serversocket.close()
665        except Exception as e:
666            self.logger.warning("Failed to close shell server socket: %s" % str(e))
667
668    def reload(self):
669        """apply config changes"""
670        self.logger.info('Applying configuration changes...')
671
672        backend = self.config.get('performance','backend')
673
674        if backend == 'thread':
675            if self.threadpool is not None:
676                minthreads = self.config.getint('performance', 'minthreads')
677                maxthreads = self.config.getint('performance', 'maxthreads')
678
679                # threadpool changes?
680                if self.threadpool.minthreads != minthreads or self.threadpool.maxthreads != maxthreads:
681                    self.logger.info('Threadpool config changed, initialising new threadpool')
682                    currentthreadpool = self.threadpool
683                    self.threadpool = self._start_threadpool()
684                    currentthreadpool.stayalive = False
685                else:
686                    self.logger.info('Keep existing threadpool')
687            else:
688                self.logger.info('Create new threadpool')
689                self.threadpool = self._start_threadpool()
690
691            # stop existing procpool
692            if self.procpool is not None:
693                self.logger.info('Delete old procpool')
694                self.procpool.shutdown()
695                self.procpool = None
696
697        elif backend == 'process':
698            # start new procpool
699            currentProcPool = self.procpool
700            self.logger.info('Create new processpool')
701            self.procpool = self._start_processpool()
702
703            # stop existing procpool
704            # -> the procpool has to be recreated to take configuration changes
705            #    into account (each worker process has its own controller unlike using threadpool)
706            if currentProcPool is not None:
707                self.logger.info('Delete old processpool')
708                currentProcPool.shutdown()
709
710            # stop existing threadpool
711            if self.threadpool is not None:
712                self.logger.info('Delete old threadpool')
713                self.threadpool.stayalive = False
714                self.threadpool = None
715        else:
716            self.logger.error('backend not detected -> ignoring input!')
717
718        # smtp engine changes?
719        ports = self.config.get('main', 'incomingport')
720        portspeclist = ports.split(',')
721        portlist = []
722
723        for portspec in portspeclist:
724            if portspec.find(':') > 0:
725                (protocol, port) = portspec.split(':')
726                port = int(port)
727            else:
728                port = int(portspec)
729            portlist.append(port)
730            alreadyRunning = False
731            for serv in self.servers:
732                if serv.port == port:
733                    alreadyRunning = True
734                    break
735
736            if not alreadyRunning:
737                self.logger.info('start new connector at %s' % str(portspec))
738                self.start_connector(portspec)
739            else:
740                self.logger.info('keep connector at %s' % str(portspec))
741
742        servercopy = self.servers[:]
743        for serv in servercopy:
744            if serv.port not in portlist:
745                self.logger.info('Closing server socket on port %s' % serv.port)
746                serv.shutdown()
747                self.servers.remove(serv)
748            else:
749                self.logger.info('Keep server socket on port %s' % serv.port)
750
751        self.logger.info('Config changes applied')
752
753    def shutdown(self):
754        if self.statsthread:
755            self.statsthread.stayalive = False
756        for server in self.servers:
757            self.logger.info('Closing server socket on port %s' % server.port)
758            server.shutdown()
759
760        if self.controlserver is not None:
761            self.controlserver.shutdown()
762
763        if self.threadpool:
764            self.threadpool.stayalive = False
765        if self.procpool:
766            self.procpool.stayalive = False
767
768        self.stayalive = False
769        self.logger.info('Shutdown complete')
770        self.logger.info('Remaining threads: %s' % threading.enumerate())
771
772    def _lint_dependencies(self, fc):
773        print(fc.strcolor('Checking dependencies...', 'magenta'))
774        try:
775            import sqlalchemy
776            print(fc.strcolor('sqlalchemy: Version %s installed' % sqlalchemy.__version__, 'green'))
777        except:
778            print(fc.strcolor('sqlalchemy: not installed', 'yellow') +
779                  " Optional dependency, required if you want to enable any database lookups")
780
781        if HAVE_BEAUTIFULSOUP:
782            print(
783                fc.strcolor('BeautifulSoup: V%s installed' % BS_VERSION, 'green'))
784        else:
785            print(fc.strcolor('BeautifulSoup: not installed', 'yellow') +
786                  " Optional dependency, this improves accuracy for stripped body searches in filters - not required with a default config")
787
788        try:
789            import magic
790
791            if hasattr(magic, 'open'):
792                magic_vers = "python-file/libmagic bindings (http://www.darwinsys.com/file/)"
793                print(fc.strcolor('magic: found %s' % magic_vers, 'green'))
794            elif hasattr(magic, 'from_buffer'):
795                magic_vers = "python-magic (https://github.com/ahupp/python-magic)"
796                print(fc.strcolor('magic: found %s' % magic_vers, 'green'))
797            else:
798                print(fc.strcolor('magic: unsupported version', 'yellow') +
799                      " File type detection requires either the python bindings from http://www.darwinsys.com/file/ or python magic from https://github.com/ahupp/python-magic")
800        except:
801            print(fc.strcolor('magic: not installed', 'yellow') +
802                  " Optional dependency, without python-file or python-magic the attachment plugin's automatic file type detection will easily be fooled")
803
804    def lint(self):
805        errors = 0
806        fc = FunkyConsole()
807        self._lint_dependencies(fc)
808
809        print(fc.strcolor('Loading extensions...', 'magenta'))
810        exts = self.load_extensions()
811        for ext in exts:
812            (name, enabled, status) = ext
813            pname = fc.strcolor(name, 'cyan')
814            if enabled:
815                penabled = fc.strcolor('enabled', 'green')
816            else:
817                penabled = fc.strcolor('disabled', 'red')
818            print("%s: %s (%s)" % (pname, penabled, status))
819
820        print(fc.strcolor('Loading plugins...', 'magenta'))
821        if not self.load_plugins():
822            print(fc.strcolor('At least one plugin failed to load', 'red'))
823        print(fc.strcolor('Plugin loading complete', 'magenta'))
824
825        print("Linting ", fc.strcolor("main configuration", 'cyan'))
826        if not self.checkConfig():
827            print(fc.strcolor("ERROR", "red"))
828        else:
829            print(fc.strcolor("OK", "green"))
830
831        trashdir = self.config.get('main', 'trashdir').strip()
832        if trashdir != "" and not os.path.isdir(trashdir):
833            print(fc.strcolor("Trashdir %s does not exist" % trashdir, 'red'))
834
835        # sql config override
836        sqlconfigdbconnectstring = self.config.get('databaseconfig', 'dbconnectstring')
837        if sqlconfigdbconnectstring.strip() != '':
838            print()
839            print("Linting ", fc.strcolor("sql configuration", 'cyan'))
840            try:
841                from fuglu.extensions.sql import get_session
842                sess = get_session(sqlconfigdbconnectstring)
843                tempsuspect = Suspect(
844                    'sender@example.com', 'recipient@example.com', '/dev/null')
845                sqlvars = dict(
846                    section='testsection', option='testoption', scope='$GLOBAL')
847                default_template_values(tempsuspect, sqlvars)
848                sess.execute(self.config.get('databaseconfig', 'sql'), sqlvars)
849                sess.remove()
850                print(fc.strcolor("OK", 'green'))
851            except Exception as e:
852                print(fc.strcolor("Failed %s" % str(e), 'red'))
853
854        allplugins = self.plugins + self.prependers + self.appenders
855
856        for plugin in allplugins:
857            print()
858            print("Linting Plugin ", fc.strcolor(str(plugin), 'cyan'),
859                  'Config section:', fc.strcolor(str(plugin.section), 'cyan'))
860            try:
861                result = plugin.lint()
862            except Exception as e:
863                CrashStore.store_exception()
864                print("ERROR: %s" % e)
865                result = False
866
867            if result:
868                print(fc.strcolor("OK", "green"))
869            else:
870                errors = errors + 1
871                print(fc.strcolor("ERROR", "red"))
872        print("%s plugins reported errors." % errors)
873
874        if self.config.getboolean('main', 'versioncheck'):
875            check_version_status(lint=True)
876
877    def propagate_defaults(self, requiredvars, config, defaultsection=None):
878        """propagate defaults from requiredvars if they are missing in config"""
879        for option, infodic in requiredvars.items():
880            if 'section' in infodic:
881                section = infodic['section']
882            else:
883                section = defaultsection
884
885            default = infodic['default']
886
887            if not config.has_section(section):
888                config.add_section(section)
889
890            if not config.has_option(section, option):
891                config.set(section, option, default)
892
893    def propagate_core_defaults(self):
894        """check for missing core config options and try to fill them with defaults
895        must be called before we can do plugin loading stuff
896        """
897        self.propagate_defaults(self.requiredvars, self.config, 'main')
898
899    def propagate_plugin_defaults(self):
900        """propagate defaults from loaded lugins"""
901        #plugins, prependers, appenders
902        allplugs = self.plugins + self.prependers + self.appenders
903        for plug in allplugs:
904            if hasattr(plug, 'requiredvars'):
905                requiredvars = getattr(plug, 'requiredvars')
906                if type(requiredvars) == dict:
907                    self.propagate_defaults(requiredvars, self.config, plug.section)
908
909    def checkConfig(self):
910        """Check if all requred options are in the config file
911        Fill missing values with defaults if possible
912        """
913        allOK = True
914        for config, infodic in self.requiredvars.items():
915            section = infodic['section']
916            try:
917                var = self.config.get(section, config)
918
919                if 'validator' in infodic and not infodic["validator"](var):
920                    print("Validation failed for [%s] :: %s" % (section, config))
921                    allOK = False
922
923            except configparser.NoSectionError:
924                print("Missing configuration section [%s] :: %s" % (section, config))
925                allOK = False
926            except configparser.NoOptionError:
927                print("Missing configuration value [%s] :: %s" % (section, config))
928                allOK = False
929        return allOK
930
931    def load_extensions(self):
932        """load fuglu extensions"""
933        ret = []
934        import fuglu.extensions
935        for extension in fuglu.extensions.__all__:
936            mod = __import__('fuglu.extensions.%s' % extension)
937            ext = getattr(mod, 'extensions')
938            fl = getattr(ext, extension)
939            enabled = getattr(fl, 'ENABLED')
940            status = getattr(fl, 'STATUS')
941            name = getattr(fl, '__name__')
942            ret.append((name, enabled, status))
943        return ret
944
945    def get_component_by_alias(self, pluginalias):
946        """Returns the full plugin component from an alias. if this alias is not configured, return the original string"""
947        if not self.config.has_section('PluginAlias'):
948            return pluginalias
949
950        if not self.config.has_option('PluginAlias', pluginalias):
951            return pluginalias
952
953        return self.config.get('PluginAlias', pluginalias)
954
955    def load_plugins(self):
956        """load plugins defined in config"""
957        allOK = True
958        plugindirs = self.config.get('main', 'plugindir').strip().split(',')
959        for plugindir in plugindirs:
960            if os.path.isdir(plugindir):
961                self.logger.debug('Searching for additional plugins in %s' % plugindir)
962                if plugindir not in sys.path:
963                    sys.path.insert(0, plugindir)
964            else:
965                self.logger.warning('Plugin directory %s not found' % plugindir)
966
967        self.logger.debug('Module search path %s' % sys.path)
968        self.logger.debug('Loading scanner plugins')
969        newplugins, loadok = self._load_all(self.config.get('main', 'plugins'))
970        if not loadok:
971            allOK = False
972
973        newprependers, loadok = self._load_all(
974            self.config.get('main', 'prependers'))
975        if not loadok:
976            allOK = False
977
978        newappenders, loadok = self._load_all(
979            self.config.get('main', 'appenders'))
980        if not loadok:
981            allOK = False
982
983        if allOK:
984            self.plugins = newplugins
985            self.prependers = newprependers
986            self.appenders = newappenders
987            self.propagate_plugin_defaults()
988
989        return allOK
990
991    def _load_all(self, configstring):
992        """load all plugins from config string. returns tuple ([list of loaded instances],allOk)"""
993        pluglist = []
994        config_re = re.compile(
995            """^(?P<structured_name>[a-zA-Z0-9\.\_\-]+)(?:\((?P<config_override>[a-zA-Z0-9\.\_\-]+)\))?$""")
996        allOK = True
997        plugins = configstring.split(',')
998        for plug in plugins:
999            if plug == "":
1000                continue
1001            m = config_re.match(plug)
1002            if m is None:
1003                self.logger.error('Invalid Plugin Syntax: %s' % plug)
1004                allOK = False
1005                continue
1006            structured_name, configoverride = m.groups()
1007            structured_name = self.get_component_by_alias(structured_name)
1008            try:
1009                plugininstance = self._load_component(
1010                    structured_name, configsection=configoverride)
1011                pluglist.append(plugininstance)
1012            except (configparser.NoSectionError, configparser.NoOptionError):
1013                CrashStore.store_exception()
1014                self.logger.error("The plugin %s is accessing the config in __init__ -> can not load default values" % structured_name)
1015            except Exception as e:
1016                CrashStore.store_exception()
1017                self.logger.error('Could not load plugin %s : %s' %
1018                                     (structured_name, e))
1019                exc = traceback.format_exc()
1020                self.logger.error(exc)
1021                allOK = False
1022
1023        return pluglist, allOK
1024
1025    def _load_component(self, structured_name, configsection=None):
1026        # from:
1027        # http://mail.python.org/pipermail/python-list/2003-May/204392.html
1028        component_names = structured_name.split('.')
1029        mod = __import__('.'.join(component_names[:-1]))
1030        for component_name in component_names[1:]:
1031            mod = getattr(mod, component_name)
1032
1033        if configsection is None:
1034            plugininstance = mod(self.config)
1035        else:
1036            # check if plugin supports config override
1037            if 'section' in inspect.getargspec(mod.__init__)[0]:
1038                plugininstance = mod(self.config, section=configsection)
1039            else:
1040                raise Exception('Cannot set Config Section %s : Plugin %s does not support config override' % (
1041                    configsection, mod))
1042        return plugininstance
1043
1044