1# Copyright 2009-2018 Oli Schacher 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14# 15# 16# 17 18from fuglu.shared import DUNNO, ACCEPT, REJECT, DEFER, DELETE 19from fuglu.debug import CrashStore 20import logging 21from fuglu.stats import Statskeeper, StatDelta 22import sys 23import traceback 24import tempfile 25import time 26import os 27import datetime 28 29 30class SessionHandler(object): 31 32 """thread handling one message""" 33 34 def __init__(self, protohandler, config, prependers, plugins, appenders): 35 self.logger = logging.getLogger("fuglu.SessionHandler") 36 self.action = DUNNO 37 self.config = config 38 self.prependers = prependers 39 self.plugins = plugins 40 self.appenders = appenders 41 self.stats = Statskeeper() 42 self.worker = None 43 self.message = None 44 self.protohandler = protohandler 45 46 def set_workerstate(self, status): 47 if self.worker is not None: 48 self.worker.workerstate = status 49 50 def handlesession(self, worker=None): 51 self.worker = worker 52 53 prependheader = self.config.get('main', 'prependaddedheaders') 54 try: 55 self.set_workerstate('receiving message') 56 suspect = self.protohandler.get_suspect() 57 if suspect is None: 58 self.logger.error('No Suspect retrieved, ending session') 59 return 60 self.stats.increase_counter_values(StatDelta(in_=1)) 61 62 if len(suspect.recipients) != 1: 63 self.logger.warning('Notice: Message from %s has %s recipients. Plugins supporting only one recipient will see: %s' % ( 64 suspect.from_address, len(suspect.recipients), suspect.to_address)) 65 self.logger.debug("Message from %s to %s: %s bytes stored to %s" % ( 66 suspect.from_address, suspect.to_address, suspect.size, suspect.tempfile)) 67 self.set_workerstate("Handling message %s" % suspect) 68 # store incoming port to tag, could be used to disable plugins 69 # based on port 70 try: 71 port = self.protohandler.socket.getsockname()[1] 72 if port is not None: 73 suspect.tags['incomingport'] = port 74 except Exception as e: 75 self.logger.warning('Could not get incoming port: %s' % str(e)) 76 77 pluglist = self.run_prependers(suspect) 78 79 starttime = time.time() 80 self.run_plugins(suspect, pluglist) 81 82 # Set fuglu spam status if wanted 83 if self.config.getboolean('main', 'spamstatusheader'): 84 if suspect.is_spam(): 85 suspect.addheader("%sSpamstatus" % prependheader, 'YES') 86 else: 87 suspect.addheader("%sSpamstatus" % prependheader, 'NO') 88 89 # how long did it all take? 90 difftime = time.time() - starttime 91 suspect.tags['fuglu.scantime'] = "%.4f" % difftime 92 93 # Debug info to mail 94 if self.config.getboolean('main', 'debuginfoheader'): 95 debuginfo = str(suspect) 96 suspect.addheader("%sDebuginfo" % prependheader, debuginfo) 97 98 # add suspect id for tracking 99 if self.config.getboolean('main', 'suspectidheader'): 100 suspect.addheader('%sSuspect' % prependheader, suspect.id) 101 102 # checks done.. print out suspect status 103 logformat = self.config.get('main', 'logtemplate') 104 if logformat.strip() != '': 105 self.logger.info(suspect.log_format(logformat)) 106 suspect.debug(suspect) 107 108 # check if one of the plugins made a decision 109 result = self.action 110 111 self.set_workerstate("Finishing message %s" % suspect) 112 113 message_is_deferred = False 114 if result == ACCEPT or result == DUNNO: 115 try: 116 self.protohandler.commitback(suspect) 117 self.stats.increase_counter_values(StatDelta(out=1)) 118 119 except KeyboardInterrupt: 120 sys.exit() 121 except Exception: 122 message_is_deferred = True 123 trb = traceback.format_exc() 124 self.logger.error("Could not commit message. Error: %s" % trb) 125 self._defer() 126 127 elif result == DELETE: 128 self.logger.info("MESSAGE DELETED: %s" % suspect.id) 129 retmesg = 'OK: (%s)' % suspect.id 130 if self.message is not None: 131 retmesg = self.message 132 self.protohandler.discard(retmesg) 133 elif result == REJECT: 134 retmesg = "Rejected by content scanner" 135 if self.message is not None: 136 retmesg = self.message 137 self.protohandler.reject(retmesg) 138 elif result == DEFER: 139 message_is_deferred = True 140 self._defer(self.message) 141 else: 142 self.logger.error( 143 'Invalid Message action Code: %s. Using DEFER' % result) 144 message_is_deferred = True 145 self._defer() 146 147 # run appenders (stats plugin etc) unless msg is deferred 148 if not message_is_deferred: 149 self.stats.increasecounters(suspect) 150 self.run_appenders(suspect, result) 151 else: 152 self.logger.warning("DEFERRED %s" % suspect.id) 153 154 # clean up 155 try: 156 os.remove(suspect.tempfile) 157 self.logger.debug('Removed tempfile %s' % suspect.tempfile) 158 except OSError: 159 self.logger.warning('Could not remove tempfile %s' % suspect.tempfile) 160 except KeyboardInterrupt: 161 sys.exit(0) 162 except ValueError: 163 self._defer() 164 except Exception as e: 165 exc = traceback.format_exc() 166 self.logger.error('Exception %s: %s' % (e, exc)) 167 self._defer() 168 169 self.logger.debug('Session finished') 170 171 172 def _defer(self, message=None): 173 if message is None: 174 message="internal problem - message deferred" 175 176 # try to end the session gracefully, but this might cause the same exception again, 177 # in case of a broken pipe for example 178 try: 179 self.protohandler.defer(message) 180 except Exception: 181 pass 182 183 184 def trash(self, suspect, killerplugin=None): 185 """copy suspect to trash if this is enabled""" 186 trashdir = self.config.get('main', 'trashdir').strip() 187 if trashdir == "": 188 return 189 190 if not os.path.isdir(trashdir): 191 try: 192 os.makedirs(trashdir) 193 except: 194 self.logger.error( 195 "Trashdir %s does not exist and could not be created" % trashdir) 196 return 197 self.logger.info('Created trashdir %s' % trashdir) 198 199 trashfilename = '' 200 try: 201 handle, trashfilename = tempfile.mkstemp( 202 prefix=suspect.id, dir=self.config.get('main', 'trashdir')) 203 with os.fdopen(handle, 'w+b') as trashfile: 204 trashfile.write(suspect.get_source()) 205 self.logger.debug('Message stored to trash: %s' % trashfilename) 206 except Exception as e: 207 self.logger.error( 208 "could not create file %s: %s" % (trashfilename, e)) 209 210 # TODO: document main.trashlog 211 if self.config.has_option('main', 'trashlog') and self.config.getboolean('main', 'trashlog'): 212 try: 213 with open('%s/00-fuglutrash.log' % self.config.get('main', 'trashdir'), 'a') as handle: 214 # <date> <time> <from address> <to address> <plugin that said "DELETE"> <filename> 215 now = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") 216 handle.write("%s %s %s %s %s" % ( 217 now, suspect.from_address, suspect.to_address, killerplugin, trashfilename)) 218 handle.write("\n") 219 except Exception as e: 220 self.logger.error("Could not update trash log: %s" % e) 221 222 def run_plugins(self, suspect, pluglist): 223 """Run scannerplugins on suspect""" 224 suspect.debug('Will run plugins: %s' % pluglist) 225 for plugin in pluglist: 226 try: 227 self.logger.debug('Running plugin %s' % plugin) 228 self.set_workerstate( 229 "%s : Running Plugin %s" % (suspect, plugin)) 230 suspect.debug('Running plugin %s' % str(plugin)) 231 starttime = time.time() 232 ans = plugin.examine(suspect) 233 plugintime = time.time() - starttime 234 suspect.tags['scantimes'].append((plugin.section, plugintime)) 235 message = None 236 if type(ans) is tuple: 237 result, message = ans 238 else: 239 result = ans 240 241 if result is None: 242 result = DUNNO 243 244 suspect.tags['decisions'].append((plugin.section, result)) 245 246 if result == DUNNO: 247 suspect.debug('Plugin makes no final decision') 248 elif result == ACCEPT: 249 suspect.debug( 250 'Plugin accepts the message - skipping all further tests') 251 self.logger.debug( 252 'Plugin says: ACCEPT. Skipping all other tests') 253 self.action = ACCEPT 254 break 255 elif result == DELETE: 256 suspect.debug( 257 'Plugin DELETES this message - no further tests') 258 self.logger.debug( 259 'Plugin says: DELETE. Skipping all other tests') 260 self.action = DELETE 261 self.message = message 262 self.trash(suspect, str(plugin)) 263 break 264 elif result == REJECT: 265 suspect.debug( 266 'Plugin REJECTS this message - no further tests') 267 self.logger.debug( 268 'Plugin says: REJECT. Skipping all other tests') 269 self.action = REJECT 270 self.message = message 271 break 272 elif result == DEFER: 273 suspect.debug( 274 'Plugin DEFERS this message - no further tests') 275 self.logger.debug( 276 'Plugin says: DEFER. Skipping all other tests') 277 self.action = DEFER 278 self.message = message 279 break 280 else: 281 self.logger.error( 282 'Invalid Message action Code: %s. Using DUNNO' % result) 283 284 except Exception as e: 285 CrashStore.store_exception() 286 exc = traceback.format_exc() 287 self.logger.error('Plugin %s failed: %s' % (str(plugin), exc)) 288 suspect.debug( 289 'Plugin failed : %s . Please check fuglu log for more details' % e) 290 291 def run_prependers(self, suspect): 292 """Run prependers on suspect""" 293 plugcopy = self.plugins[:] 294 for plugin in self.prependers: 295 try: 296 self.logger.debug('Running prepender %s' % plugin) 297 self.set_workerstate( 298 "%s : Running Prepender %s" % (suspect, plugin)) 299 starttime = time.time() 300 result = plugin.pluginlist(suspect, plugcopy) 301 plugintime = time.time() - starttime 302 suspect.tags['scantimes'].append((plugin.section, plugintime)) 303 if result is not None: 304 plugcopyset = set(plugcopy) 305 resultset = set(result) 306 removed = list(plugcopyset - resultset) 307 added = list(resultset - plugcopyset) 308 if len(removed) > 0: 309 self.logger.debug( 310 'Prepender %s removed plugins: %s' % (plugin, list(map(str, removed)))) 311 if len(added) > 0: 312 self.logger.debug( 313 'Prepender %s added plugins: %s' % (plugin, list(map(str, added)))) 314 plugcopy = result 315 316 except Exception: 317 CrashStore.store_exception() 318 exc = traceback.format_exc() 319 self.logger.error( 320 'Prepender plugin %s failed: %s' % (str(plugin), exc)) 321 return plugcopy 322 323 def run_appenders(self, suspect, finaldecision): 324 """Run appenders on suspect""" 325 if suspect.get_tag('noappenders'): 326 return 327 328 for plugin in self.appenders: 329 try: 330 self.logger.debug('Running appender %s' % plugin) 331 suspect.debug('Running appender %s' % plugin) 332 self.set_workerstate( 333 "%s : Running appender %s" % (suspect, plugin)) 334 starttime = time.time() 335 plugin.process(suspect, finaldecision) 336 plugintime = time.time() - starttime 337 suspect.tags['scantimes'].append((plugin.section, plugintime)) 338 except Exception: 339 CrashStore.store_exception() 340 exc = traceback.format_exc() 341 self.logger.error( 342 'Appender plugin %s failed: %s' % (str(plugin), exc)) 343