1#   Copyright 2009-2018 Oli Schacher
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14#
15#
16#
17
18from fuglu.shared import DUNNO, ACCEPT, REJECT, DEFER, DELETE
19from fuglu.debug import CrashStore
20import logging
21from fuglu.stats import Statskeeper, StatDelta
22import sys
23import traceback
24import tempfile
25import time
26import os
27import datetime
28
29
30class SessionHandler(object):
31
32    """thread handling one message"""
33
34    def __init__(self, protohandler, config, prependers, plugins, appenders):
35        self.logger = logging.getLogger("fuglu.SessionHandler")
36        self.action = DUNNO
37        self.config = config
38        self.prependers = prependers
39        self.plugins = plugins
40        self.appenders = appenders
41        self.stats = Statskeeper()
42        self.worker = None
43        self.message = None
44        self.protohandler = protohandler
45
46    def set_workerstate(self, status):
47        if self.worker is not None:
48            self.worker.workerstate = status
49
50    def handlesession(self, worker=None):
51        self.worker = worker
52
53        prependheader = self.config.get('main', 'prependaddedheaders')
54        try:
55            self.set_workerstate('receiving message')
56            suspect = self.protohandler.get_suspect()
57            if suspect is None:
58                self.logger.error('No Suspect retrieved, ending session')
59                return
60            self.stats.increase_counter_values(StatDelta(in_=1))
61
62            if len(suspect.recipients) != 1:
63                self.logger.warning('Notice: Message from %s has %s recipients. Plugins supporting only one recipient will see: %s' % (
64                    suspect.from_address, len(suspect.recipients), suspect.to_address))
65            self.logger.debug("Message from %s to %s: %s bytes stored to %s" % (
66                suspect.from_address, suspect.to_address, suspect.size, suspect.tempfile))
67            self.set_workerstate("Handling message %s" % suspect)
68            # store incoming port to tag, could be used to disable plugins
69            # based on port
70            try:
71                port = self.protohandler.socket.getsockname()[1]
72                if port is not None:
73                    suspect.tags['incomingport'] = port
74            except Exception as e:
75                self.logger.warning('Could not get incoming port: %s' % str(e))
76
77            pluglist = self.run_prependers(suspect)
78
79            starttime = time.time()
80            self.run_plugins(suspect, pluglist)
81
82            # Set fuglu spam status if wanted
83            if self.config.getboolean('main', 'spamstatusheader'):
84                if suspect.is_spam():
85                    suspect.addheader("%sSpamstatus" % prependheader, 'YES')
86                else:
87                    suspect.addheader("%sSpamstatus" % prependheader, 'NO')
88
89            # how long did it all take?
90            difftime = time.time() - starttime
91            suspect.tags['fuglu.scantime'] = "%.4f" % difftime
92
93            # Debug info to mail
94            if self.config.getboolean('main', 'debuginfoheader'):
95                debuginfo = str(suspect)
96                suspect.addheader("%sDebuginfo" % prependheader, debuginfo)
97
98            # add suspect id for tracking
99            if self.config.getboolean('main', 'suspectidheader'):
100                suspect.addheader('%sSuspect' % prependheader, suspect.id)
101
102            # checks done.. print out suspect status
103            logformat = self.config.get('main', 'logtemplate')
104            if logformat.strip() != '':
105                self.logger.info(suspect.log_format(logformat))
106            suspect.debug(suspect)
107
108            # check if one of the plugins made a decision
109            result = self.action
110
111            self.set_workerstate("Finishing message %s" % suspect)
112
113            message_is_deferred = False
114            if result == ACCEPT or result == DUNNO:
115                try:
116                    self.protohandler.commitback(suspect)
117                    self.stats.increase_counter_values(StatDelta(out=1))
118
119                except KeyboardInterrupt:
120                    sys.exit()
121                except Exception:
122                    message_is_deferred = True
123                    trb = traceback.format_exc()
124                    self.logger.error("Could not commit message. Error: %s" % trb)
125                    self._defer()
126
127            elif result == DELETE:
128                self.logger.info("MESSAGE DELETED: %s" % suspect.id)
129                retmesg = 'OK: (%s)' % suspect.id
130                if self.message is not None:
131                    retmesg = self.message
132                self.protohandler.discard(retmesg)
133            elif result == REJECT:
134                retmesg = "Rejected by content scanner"
135                if self.message is not None:
136                    retmesg = self.message
137                self.protohandler.reject(retmesg)
138            elif result == DEFER:
139                message_is_deferred = True
140                self._defer(self.message)
141            else:
142                self.logger.error(
143                    'Invalid Message action Code: %s. Using DEFER' % result)
144                message_is_deferred = True
145                self._defer()
146
147            # run appenders (stats plugin etc) unless msg is deferred
148            if not message_is_deferred:
149                self.stats.increasecounters(suspect)
150                self.run_appenders(suspect, result)
151            else:
152                self.logger.warning("DEFERRED %s" % suspect.id)
153
154            # clean up
155            try:
156                os.remove(suspect.tempfile)
157                self.logger.debug('Removed tempfile %s' % suspect.tempfile)
158            except OSError:
159                self.logger.warning('Could not remove tempfile %s' % suspect.tempfile)
160        except KeyboardInterrupt:
161            sys.exit(0)
162        except ValueError:
163            self._defer()
164        except Exception as e:
165            exc = traceback.format_exc()
166            self.logger.error('Exception %s: %s' % (e, exc))
167            self._defer()
168
169        self.logger.debug('Session finished')
170
171
172    def _defer(self, message=None):
173        if message is None:
174            message="internal problem - message deferred"
175
176        # try to end the session gracefully, but this might cause the same exception again,
177        # in case of a broken pipe for example
178        try:
179            self.protohandler.defer(message)
180        except Exception:
181            pass
182
183
184    def trash(self, suspect, killerplugin=None):
185        """copy suspect to trash if this is enabled"""
186        trashdir = self.config.get('main', 'trashdir').strip()
187        if trashdir == "":
188            return
189
190        if not os.path.isdir(trashdir):
191            try:
192                os.makedirs(trashdir)
193            except:
194                self.logger.error(
195                    "Trashdir %s does not exist and could not be created" % trashdir)
196                return
197            self.logger.info('Created trashdir %s' % trashdir)
198
199        trashfilename = ''
200        try:
201            handle, trashfilename = tempfile.mkstemp(
202                prefix=suspect.id, dir=self.config.get('main', 'trashdir'))
203            with os.fdopen(handle, 'w+b') as trashfile:
204                trashfile.write(suspect.get_source())
205            self.logger.debug('Message stored to trash: %s' % trashfilename)
206        except Exception as e:
207            self.logger.error(
208                "could not create file %s: %s" % (trashfilename, e))
209
210        # TODO: document main.trashlog
211        if self.config.has_option('main', 'trashlog') and self.config.getboolean('main', 'trashlog'):
212            try:
213                with open('%s/00-fuglutrash.log' % self.config.get('main', 'trashdir'), 'a') as handle:
214                    # <date> <time> <from address> <to address> <plugin that said "DELETE"> <filename>
215                    now = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
216                    handle.write("%s %s %s %s %s" % (
217                        now, suspect.from_address, suspect.to_address, killerplugin, trashfilename))
218                    handle.write("\n")
219            except Exception as e:
220                self.logger.error("Could not update trash log: %s" % e)
221
222    def run_plugins(self, suspect, pluglist):
223        """Run scannerplugins on suspect"""
224        suspect.debug('Will run plugins: %s' % pluglist)
225        for plugin in pluglist:
226            try:
227                self.logger.debug('Running plugin %s' % plugin)
228                self.set_workerstate(
229                    "%s : Running Plugin %s" % (suspect, plugin))
230                suspect.debug('Running plugin %s' % str(plugin))
231                starttime = time.time()
232                ans = plugin.examine(suspect)
233                plugintime = time.time() - starttime
234                suspect.tags['scantimes'].append((plugin.section, plugintime))
235                message = None
236                if type(ans) is tuple:
237                    result, message = ans
238                else:
239                    result = ans
240
241                if result is None:
242                    result = DUNNO
243
244                suspect.tags['decisions'].append((plugin.section, result))
245
246                if result == DUNNO:
247                    suspect.debug('Plugin makes no final decision')
248                elif result == ACCEPT:
249                    suspect.debug(
250                        'Plugin accepts the message - skipping all further tests')
251                    self.logger.debug(
252                        'Plugin says: ACCEPT. Skipping all other tests')
253                    self.action = ACCEPT
254                    break
255                elif result == DELETE:
256                    suspect.debug(
257                        'Plugin DELETES this message - no further tests')
258                    self.logger.debug(
259                        'Plugin says: DELETE. Skipping all other tests')
260                    self.action = DELETE
261                    self.message = message
262                    self.trash(suspect, str(plugin))
263                    break
264                elif result == REJECT:
265                    suspect.debug(
266                        'Plugin REJECTS this message - no further tests')
267                    self.logger.debug(
268                        'Plugin says: REJECT. Skipping all other tests')
269                    self.action = REJECT
270                    self.message = message
271                    break
272                elif result == DEFER:
273                    suspect.debug(
274                        'Plugin DEFERS this message - no further tests')
275                    self.logger.debug(
276                        'Plugin says: DEFER. Skipping all other tests')
277                    self.action = DEFER
278                    self.message = message
279                    break
280                else:
281                    self.logger.error(
282                        'Invalid Message action Code: %s. Using DUNNO' % result)
283
284            except Exception as e:
285                CrashStore.store_exception()
286                exc = traceback.format_exc()
287                self.logger.error('Plugin %s failed: %s' % (str(plugin), exc))
288                suspect.debug(
289                    'Plugin failed : %s . Please check fuglu log for more details' % e)
290
291    def run_prependers(self, suspect):
292        """Run prependers on suspect"""
293        plugcopy = self.plugins[:]
294        for plugin in self.prependers:
295            try:
296                self.logger.debug('Running prepender %s' % plugin)
297                self.set_workerstate(
298                    "%s : Running Prepender %s" % (suspect, plugin))
299                starttime = time.time()
300                result = plugin.pluginlist(suspect, plugcopy)
301                plugintime = time.time() - starttime
302                suspect.tags['scantimes'].append((plugin.section, plugintime))
303                if result is not None:
304                    plugcopyset = set(plugcopy)
305                    resultset = set(result)
306                    removed = list(plugcopyset - resultset)
307                    added = list(resultset - plugcopyset)
308                    if len(removed) > 0:
309                        self.logger.debug(
310                            'Prepender %s removed plugins: %s' % (plugin, list(map(str, removed))))
311                    if len(added) > 0:
312                        self.logger.debug(
313                            'Prepender %s added plugins: %s' % (plugin, list(map(str, added))))
314                    plugcopy = result
315
316            except Exception:
317                CrashStore.store_exception()
318                exc = traceback.format_exc()
319                self.logger.error(
320                    'Prepender plugin %s failed: %s' % (str(plugin), exc))
321        return plugcopy
322
323    def run_appenders(self, suspect, finaldecision):
324        """Run appenders on suspect"""
325        if suspect.get_tag('noappenders'):
326            return
327
328        for plugin in self.appenders:
329            try:
330                self.logger.debug('Running appender %s' % plugin)
331                suspect.debug('Running appender %s' % plugin)
332                self.set_workerstate(
333                    "%s : Running appender %s" % (suspect, plugin))
334                starttime = time.time()
335                plugin.process(suspect, finaldecision)
336                plugintime = time.time() - starttime
337                suspect.tags['scantimes'].append((plugin.section, plugintime))
338            except Exception:
339                CrashStore.store_exception()
340                exc = traceback.format_exc()
341                self.logger.error(
342                    'Appender plugin %s failed: %s' % (str(plugin), exc))
343