1"""Parse bounce messages generated by Postfix. 2 3This also matches something called 'Keftamail' which looks just like Postfix 4bounces with the word Postfix scratched out and the word 'Keftamail' written 5in in crayon. 6 7It also matches something claiming to be 'The BNS Postfix program', and 8'SMTP_Gateway'. Everybody's gotta be different, huh? 9""" 10 11import re 12 13from enum import Enum 14from flufl.bounce.interfaces import ( 15 IBounceDetector, NoFailures, NoTemporaryFailures) 16from io import BytesIO 17from public import public 18from zope.interface import implementer 19 20 21# Are these heuristics correct or guaranteed? 22pcre = re.compile( 23 b'[ \\t]*the\\s*(bns)?\\s*(postfix|keftamail|smtp_gateway)', 24 re.IGNORECASE) 25rcre = re.compile(b'failure reason:$', re.IGNORECASE) 26acre = re.compile(b'<(?P<addr>[^>]*)>:') 27 28REPORT_TYPES = ('multipart/mixed', 'multipart/report') 29 30 31class ParseState(Enum): 32 start = 0 33 salutation_found = 1 34 35 36def flatten(msg, leaves): 37 # Give us all the leaf (non-multipart) subparts. 38 if msg.is_multipart(): 39 for part in msg.get_payload(): 40 flatten(part, leaves) 41 else: 42 leaves.append(msg) 43 44 45def findaddr(msg): 46 addresses = set() 47 body = BytesIO(msg.get_payload(decode=True)) 48 state = ParseState.start 49 for line in body: 50 # Preserve leading whitespace. 51 line = line.rstrip() 52 # Yes, use match() to match at beginning of string. 53 if state is ParseState.start and ( 54 pcre.match(line) or rcre.match(line)): 55 # Then... 56 state = ParseState.salutation_found 57 elif state is ParseState.salutation_found and line: 58 mo = acre.search(line) 59 if mo: 60 addresses.add(mo.group('addr')) 61 # Probably a continuation line. 62 return addresses 63 64 65@public 66@implementer(IBounceDetector) 67class Postfix: 68 """Parse bounce messages generated by Postfix.""" 69 70 def process(self, msg): 71 """See `IBounceDetector`.""" 72 if msg.get_content_type() not in REPORT_TYPES: 73 return NoFailures 74 # We're looking for the plain/text subpart with a Content-Description: 75 # of 'notification'. 76 leaves = [] 77 flatten(msg, leaves) 78 for subpart in leaves: 79 content_type = subpart.get_content_type() 80 content_desc = subpart.get('content-description', '').lower() 81 if content_type == 'text/plain' and content_desc == 'notification': 82 return NoTemporaryFailures, set(findaddr(subpart)) 83 return NoFailures 84