# $Log$ # Revision 1.8 2011/11/05 15:51:03 customdesigned # New example # # Revision 1.7 2009/06/13 21:15:12 customdesigned # Doxygen updates. # # Revision 1.6 2009/06/09 03:13:13 customdesigned # More doxygen docs. # # Revision 1.5 2005/07/20 14:49:43 customdesigned # Handle corrupt and empty ZIP files. # # Revision 1.4 2005/06/17 01:49:39 customdesigned # Handle zip within zip. # # Revision 1.3 2005/06/02 15:00:17 customdesigned # Configure banned extensions. Scan zipfile option with test case. # # Revision 1.2 2005/06/02 04:18:55 customdesigned # Update copyright notices after reading article on /. # # Revision 1.1.1.4 2005/05/31 18:23:49 customdesigned # Development changes since 0.7.2 # # Revision 1.62 2005/02/14 22:31:17 stuart # _parseparam replacement not needed for python2.4 # # Revision 1.61 2005/02/12 02:11:11 stuart # Pass unit tests with python2.4. # # Revision 1.60 2005/02/11 18:34:14 stuart # Handle garbage after quote in boundary. # # Revision 1.59 2005/02/10 01:10:59 stuart # Fixed MimeMessage.ismodified() # # Revision 1.58 2005/02/10 00:56:49 stuart # Runs with python2.4. Defang not working correctly - more work needed. # # Revision 1.57 2004/11/20 16:37:52 stuart # fix regex for splitting header and body # # Revision 1.56 2004/11/09 20:33:51 stuart # Recognize more dynamic PTR variations. # # Revision 1.55 2004/10/06 21:39:20 stuart # Handle message attachments with boundary errors by not parsing them # until needed. # # Revision 1.54 2004/08/18 01:59:46 stuart # Handle mislabeled multipart messages # # Revision 1.53 2004/04/24 22:53:20 stuart # Rename some local variables to avoid shadowing builtins # # Revision 1.52 2004/04/24 22:47:13 stuart # Convert header values to str # # Revision 1.51 2004/03/25 03:19:10 stuart # Correctly defang rfc822 attachments when boundary specified with # content-type message/rfc822. # # Revision 1.50 2003/10/15 22:01:00 stuart # Test for and work around email bug with encoded filenames. # # Revision 1.49 2003/09/04 18:48:13 stuart # Support python-2.2.3 # # Revision 1.48 2003/09/02 00:27:27 stuart # Should have full milter based dspam support working # # Revision 1.47 2003/08/26 06:08:18 stuart # Use new python boolean since we now require 2.2.2 # # Revision 1.46 2003/08/26 05:01:38 stuart # Release 0.6.0 # # Revision 1.45 2003/08/26 04:01:24 stuart # Use new email module for parsing mail. Still need mime module to # provide various bug fixes to email module, and maintain some compatibility # with old milter code. # ## @package mime # This module provides a "defang" function to replace naughty attachments. # # We also provide workarounds for bugs in the email module that comes # with python. The "bugs" fixed mostly come up only with malformed # messages - but that is what you have when dealing with spam. # Author: Stuart D. Gathman # Copyright 2001,2002,2003,2004,2005 Business Management Systems, Inc. # This code is under the GNU General Public License. See COPYING for details. from __future__ import print_function try: from io import BytesIO, StringIO except: from StringIO import StringIO BytesIO = StringIO import socket import Milter import zipfile import sys import email from email.message import Message try: from email.generator import BytesGenerator from email import message_from_binary_file except: from email.generator import Generator as BytesGenerator from email import message_from_file as message_from_binary_file from email.utils import quote if not getattr(Message,'as_bytes',None): Message.as_bytes = Message.as_string ## Return a list of filenames in a zip file. # Embedded zip files are recursively expanded. def zipnames(txt): fp = BytesIO(txt) zipf = zipfile.ZipFile(fp,'r') names = [] for nm in zipf.namelist(): names.append(('zipname',nm)) if nm.lower().endswith('.zip'): names += zipnames(zipf.read(nm)) return names ## Fix multipart handling in email.Generator. # class MimeGenerator(BytesGenerator): def _dispatch(self, msg): # Get the Content-Type: for the message, then try to dispatch to # self._handle__(). If there's no handler for the # full MIME type, then dispatch to self._handle_(). If # that's missing too, then dispatch to self._writeBody(). main = msg.get_content_maintype() if msg.is_multipart() and main.lower() != 'multipart': self._handle_multipart(msg) else: BytesGenerator._dispatch(self,msg) def unquote(s): """Remove quotes from a string.""" if len(s) > 1: if s.startswith('"'): if s.endswith('"'): s = s[1:-1] else: # remove garbage after trailing quote try: s = s[1:s[1:].index('"')+1] except: return s return s.replace('\\\\', '\\').replace('\\"', '"') if s.startswith('<') and s.endswith('>'): return s[1:-1] return s def _unquotevalue(value): if isinstance(value, tuple): return value[0], value[1], unquote(value[2]) else: return unquote(value) #email.Message._unquotevalue = _unquotevalue from email.message import _parseparam ## Enhance email.message.Message # # Tracks modifications to headers of body or any part independently. class MimeMessage(Message): """Version of email.Message.Message compatible with old mime module """ def __init__(self,fp=None,seekable=1): Message.__init__(self) self.submsg = None self.modified = False ## @var headerchange # Provide a headerchange event for integration with Milter. # The headerchange attribute can be assigned a function to be called when # changing headers. The signature is: # headerchange(msg,name,value) -> None self.headerchange = None def get_param(self, param, failobj=None, header='content-type', unquote=True): val = Message.get_param(self,param,failobj,header,unquote) if val != failobj and param == 'boundary' and unquote: # unquote boundaries an extra time, test case testDefang5 return _unquotevalue(val) return val getfilename = Message.get_filename ismultipart = Message.is_multipart getheaders = Message.get_all gettype = Message.get_content_type getparam = Message.get_param def getparams(self): return self.get_params([]) def getname(self): return self.get_param('name') def getnames(self,scan_zip=False): """Return a list of (attr,name) pairs of attributes that IE might interpret as a name - and hence decide to execute this message.""" names = [] for attr,val in self._get_params_preserve([],'content-type'): if isinstance(val, tuple): # It's an RFC 2231 encoded parameter newvalue = _unquotevalue(val) if val[0]: val = unicode(newvalue[2], newvalue[0]) else: val = unicode(newvalue[2]) else: val = _unquotevalue(val.strip()) names.append((attr,val)) names += [("filename",self.get_filename())] if scan_zip: for key,name in tuple(names): # copy by converting to tuple if name and name.lower().endswith('.zip'): txt = self.get_payload(decode=True) if txt.strip(): names += zipnames(txt) return names def ismodified(self): "True if this message or a subpart has been modified." if not self.is_multipart(): if isinstance(self.submsg,Message): return self.submsg.ismodified() return self.modified if self.modified: return True for i in self.get_payload(): if i.ismodified(): return True return False def dump(self,file,unixfrom=False): "Write this message (and all subparts) to a file" g = MimeGenerator(file) g.flatten(self,unixfrom=unixfrom) def as_bytes(self, unixfrom=False): "Return the entire formatted message as a string." fp = BytesIO() self.dump(fp,unixfrom=unixfrom) return fp.getvalue() def getencoding(self): return self.get('content-transfer-encoding',None) # Decode body to stream according to transfer encoding, return encoding name def decode(self,filt): try: filt.write(self.get_payload(decode=True)) except: pass return self.getencoding() def get_payload_decoded(self): return self.get_payload(decode=True) def __setitem__(self, name, value): rc = Message.__setitem__(self,name,value) self.modified = True if self.headerchange: self.headerchange(self,name,str(value)) return rc def __delitem__(self, name): if self.headerchange: self.headerchange(self,name,None) rc = Message.__delitem__(self,name) self.modified = True return rc def get_payload(self,i=None,decode=False): msg = self.submsg if isinstance(msg,Message) and msg.ismodified(): self.set_payload([msg]) return Message.get_payload(self,i,decode) def set_payload(self, val, charset=None): self.modified = True try: val.seek(0) val = val.read() except: pass Message.set_payload(self,val,charset) self.submsg = None def get_submsg(self): t = self.get_content_type().lower() if t == 'message/rfc822' or t.startswith('multipart/'): if not self.submsg: txt = self.get_payload() if type(txt) == str: txt = self.get_payload(decode=True) self.submsg = email.message_from_string(txt,MimeMessage) for part in self.submsg.walk(): part.modified = False else: self.submsg = txt[0] return self.submsg return None def message_from_file(fp): msg = message_from_binary_file(fp,MimeMessage) for part in msg.walk(): part.modified = False assert not msg.ismodified() return msg extlist = ''.join(""" ade,adp,asd,asx,asp,bas,bat,chm,cmd,com,cpl,crt,dll,exe,hlp,hta,inf,ins,isp,js, jse,lnk,mdb,mde,msc,msi,msp,mst,ocx,pcd,pif,reg,scr,sct,shs,url,vb,vbe,vbs,wsc, wsf,wsh """.split()) bad_extensions = ['.' + x for x in extlist.split(',')] def check_ext(name): "Check a name for dangerous Winblows extensions." if not name: return name lname = name.lower() for ext in bad_extensions: if lname.endswith(ext): return name return None virus_msg = """This message appeared to contain a virus. It was originally named '%s', and has been removed. A copy of your original message was saved as '%s:%s'. See your administrator. """ def check_name(msg,savname=None,ckname=check_ext,scan_zip=False): "Replace attachment with a warning if its name is suspicious." try: for key,name in msg.getnames(scan_zip): badname = ckname(name) if badname: if key == 'zipname': badname = msg.get_filename() break else: return Milter.CONTINUE except zipfile.BadZipfile: # a ZIP that is not a zip is very suspicious badname = msg.get_filename() hostname = socket.gethostname() msg.set_payload(virus_msg % (badname,hostname,savname)) del msg["content-type"] del msg["content-disposition"] del msg["content-transfer-encoding"] name = "WARNING.TXT" msg["Content-Type"] = "text/plain; name="+name return Milter.CONTINUE def check_attachments(msg,check): """Scan attachments. msg MimeMessage check function(MimeMessage): int Return CONTINUE, REJECT, ACCEPT """ if msg.is_multipart(): for i in msg.get_payload(): rc = check_attachments(i,check) if rc != Milter.CONTINUE: return rc return Milter.CONTINUE return check(msg) # save call context for Python without nested_scopes class _defang: def __init__(self,scan_html=True): self.scan_html = scan_html def _chk_name(self,msg): rc = check_name(msg,self._savname,self._check,self.scan_zip) if self.scan_html: check_html(msg,self._savname) # remove scripts from HTML if self.scan_rfc822: msg = msg.get_submsg() if isinstance(msg,Message): return check_attachments(msg,self._chk_name) return rc def __call__(self,msg,savname=None,check=check_ext,scan_rfc822=True, scan_zip=False): """Compatible entry point. Replace all attachments with dangerous names.""" self._savname = savname self._check = check self.scan_rfc822 = scan_rfc822 self.scan_zip = scan_zip check_attachments(msg,self._chk_name) if msg.ismodified(): return True return False # emulate old defang function defang = _defang() if sys.version < '3.0.0': from sgmllib import SGMLParser as HTMLParser else: from Milter.sgmllib import SGMLParser as HTMLParser import re declname = re.compile(r'[a-zA-Z][-_.a-zA-Z0-9]*\s*') declstringlit = re.compile(r'(\'[^\']*\'|"[^"]*")\s*') class SGMLFilter(HTMLParser): """Parse HTML and pass through all constructs unchanged. It is intended for derived classes to implement exceptional processing for selected cases. """ def __init__(self,out): HTMLParser.__init__(self) self.out = out def handle_comment(self,comment): self.out.write("" % comment) def unknown_starttag(self,tag,attr): if hasattr(self,"get_starttag_text"): self.out.write(self.get_starttag_text()) else: self.out.write("<%s" % tag) for (key,val) in attr: self.out.write(' %s="%s"' % (key,val)) self.out.write('>') def handle_data(self,data): self.out.write(data) def handle_entityref(self,ref): self.out.write("&%s;" % ref) def handle_charref(self,ref): self.out.write("&#%s;" % ref) def unknown_endtag(self,tag): self.out.write("" % tag) def handle_special(self,data): self.out.write("" % data) def write(self,buf): "Act like a writer. Why doesn't HTMLParser do this by default?" self.feed(buf) # Python-2.1 sgmllib rejects illegal declarations. Since various Microsoft # products accept and output them, we need to pass them through - # at least until we discover that MS will execute them. # sgmlop-1.1 will not use this method, but calls handle_special to # do what we want. def parse_declaration(self, i): rawdata = self.rawdata n = len(rawdata) j = i + 2 while j < n: c = rawdata[j] if c == ">": # end of declaration syntax self.handle_special(rawdata[i+2:j]) return j + 1 if c in "\"'": m = declstringlit.match(rawdata, j) if not m: # incomplete or an error? return -1 j = m.end() elif c in "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ": m = declname.match(rawdata, j) if not m: # incomplete or an error? return -1 j = m.end() else: j += 1 # end of buffer between tokens return -1 class HTMLScriptFilter(SGMLFilter): "Remove scripts from an HTML document." def __init__(self,out): SGMLFilter.__init__(self,out) self.ignoring = 0 self.modified = False self.msg = "" def start_script(self,unused): #print('beg script',unused) self.ignoring += 1 self.modified = True def end_script(self): #print('end script') self.ignoring -= 1 if not self.ignoring: self.out.write(self.msg) def handle_data(self,data): if not self.ignoring: SGMLFilter.handle_data(self,data) def handle_comment(self,comment): if not self.ignoring: SGMLFilter.handle_comment(self,comment) def check_html(msg,savname=None): "Remove scripts from HTML attachments." msgtype = msg.get_content_type().lower() # check for more MSIE braindamage if msgtype == 'application/octet-stream': for (attr,name) in msg.getnames(): if name and name.lower().endswith(".htm"): msgtype = 'text/html' if msgtype == 'text/html': out = StringIO() htmlfilter = HTMLScriptFilter(out) try: htmlfilter.write(msg.get_payload(decode=True).decode()) htmlfilter.close() #except sgmllib.SGMLParseError: except: mimetools.copyliteral(msg.get_payload(),open('debug.out','wb')) htmlfilter.close() hostname = socket.gethostname() msg.set_payload( "An HTML attachment could not be parsed. The original is saved as '%s:%s'" % (hostname,savname)) del msg["content-type"] del msg["content-disposition"] del msg["content-transfer-encoding"] name = "WARNING.TXT" msg["Content-Type"] = "text/plain; name="+name return Milter.CONTINUE if htmlfilter.modified: msg.set_payload(out) # remove embedded scripts del msg["content-transfer-encoding"] email.Encoders.encode_quopri(msg) return Milter.CONTINUE if __name__ == '__main__': def _list_attach(msg): t = msg.get_content_type() p = msg.get_payload(decode=True) print(msg.get_filename(),msg.get_content_type(),type(p)) msg = msg.get_submsg() if isinstance(msg,Message): return check_attachments(msg,_list_attach) return Milter.CONTINUE for fname in sys.argv[1:]: fp = open(fname,'rb') msg = message_from_file(fp) email.iterators._structure(msg) check_attachments(msg,_list_attach)