1from collections import deque 2from contextlib import closing 3 4import attr 5import regex as re 6import six 7from attr.validators import instance_of 8from six.moves import range 9 10from flanker.mime.message.headers import MimeHeaders 11from flanker.mime.message.headers.parsing import parse_stream 12 13 14_HEADERS = ('Action', 15 'Content-Description', 16 'Diagnostic-Code', 17 'Final-Recipient', 18 'Received', 19 'Remote-Mta', 20 'Reporting-Mta', 21 'Status') 22 23_RE_STATUS = re.compile(r'\d\.\d+\.\d+', re.IGNORECASE) 24 25 26@attr.s(frozen=True) 27class Result(object): 28 score = attr.ib(validator=instance_of(float)) 29 status = attr.ib(validator=instance_of(six.text_type)) 30 diagnostic_code = attr.ib(validator=instance_of(six.text_type)) 31 notification = attr.ib(validator=instance_of(six.text_type)) 32 33 def is_bounce(self, probability=0.3): 34 return self.score > probability 35 36 37def detect(message): 38 headers = _collect_headers(message) 39 return Result(score=len(headers) / float(len(_HEADERS)), 40 status=_get_status(headers), 41 diagnostic_code=headers.get('Diagnostic-Code', u''), 42 notification=_get_notification(message)) 43 44 45def _collect_headers(message): 46 collected = deque() 47 for p in message.walk(with_self=True): 48 for h in _HEADERS: 49 if h in p.headers: 50 collected.append((h, p.headers[h])) 51 if p.content_type.is_delivery_status(): 52 collected += _collect_headers_from_status(p.body) 53 54 return MimeHeaders(collected) 55 56 57def _collect_headers_from_status(body): 58 out = deque() 59 with closing(six.StringIO(body)) as stream: 60 for i in range(3): 61 out += parse_stream(stream) 62 63 return out 64 65 66def _get_status(headers): 67 for v in headers.getall('Status'): 68 if _RE_STATUS.match(v.strip()): 69 return v 70 71 return u'' 72 73 74def _get_notification(message): 75 for part in message.walk(): 76 content_desc = part.headers.get('Content-Description', '').lower() 77 if content_desc == 'notification': 78 return part.body 79 80 return u'' 81